/*
* Copyright (C) 2015 - EfficiOS Inc., Alexandre Montplaisir <alexmonthy@efficios.com>
* Copyright (C) 2013 - David Goulet <dgoulet@efficios.com>
*
* This library is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License, version 2.1 only,
* as published by the Free Software Foundation.
*
* This library is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
* for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this library; if not, write to the Free Software Foundation,
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
package org.lttng.ust.agent;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.lttng.ust.agent.client.ILttngTcpClientListener;
import org.lttng.ust.agent.client.LttngTcpSessiondClient;
import org.lttng.ust.agent.filter.FilterChangeNotifier;
import org.lttng.ust.agent.session.EventRule;
import org.lttng.ust.agent.utils.LttngUstAgentLogger;
/**
* Base implementation of a {@link ILttngAgent}.
*
* @author Alexandre Montplaisir
* @param <T>
* The type of logging handler that should register to this agent
*/
public abstract class AbstractLttngAgent<T extends ILttngHandler>
implements ILttngAgent<T>, ILttngTcpClientListener {
private static final String WILDCARD = "*";
private static final int INIT_TIMEOUT = 3; /* Seconds */
/** The handlers registered to this agent */
private final Set<T> registeredHandlers = new HashSet<T>();
/**
* The trace events currently enabled in the sessions.
*
* The key represents the event name, the value is the ref count (how many
* different sessions currently have this event enabled). Once the ref count
* falls to 0, this means we can avoid sending log events through JNI
* because nobody wants them.
*
* It uses a concurrent hash map, so that the {@link #isEventEnabled} and
* read methods do not need to take a synchronization lock.
*/
private final Map<String, Integer> enabledEvents = new ConcurrentHashMap<String, Integer>();
/**
* The trace events prefixes currently enabled in the sessions, which means
* the event names finishing in *, like "abcd*". We track them separately
* from the standard event names, so that we can use {@link String#equals}
* and {@link String#startsWith} appropriately.
*
* We track the lone wildcard "*" separately, in {@link #enabledWildcards}.
*/
private final NavigableMap<String, Integer> enabledEventPrefixes =
new ConcurrentSkipListMap<String, Integer>();
/** Number of sessions currently enabling the wildcard "*" event */
private final AtomicInteger enabledWildcards = new AtomicInteger(0);
/**
* The application contexts currently enabled in the tracing sessions.
*
* It is first indexed by context retriever, then by context name. This
* allows to efficiently query all the contexts for a given retriever.
*
* Works similarly as {@link #enabledEvents}, but for app contexts (and with
* an extra degree of indexing).
*
* TODO Could be changed to a Guava Table once/if we start using it.
*/
private final Map<String, Map<String, Integer>> enabledAppContexts = new ConcurrentHashMap<String, Map<String, Integer>>();
/** Tracing domain. Defined by the sub-classes via the constructor. */
private final Domain domain;
/* Lazy-loaded sessiond clients and their thread objects */
private LttngTcpSessiondClient rootSessiondClient = null;
private LttngTcpSessiondClient userSessiondClient = null;
private Thread rootSessiondClientThread = null;
private Thread userSessiondClientThread = null;
/** Indicates if this agent has been initialized. */
private boolean initialized = false;
/**
* Constructor. Should only be called by sub-classes via super(...);
*
* @param domain
* The tracing domain of this agent.
*/
protected AbstractLttngAgent(Domain domain) {
this.domain = domain;
}
@Override
public Domain getDomain() {
return domain;
}
@Override
public void registerHandler(T handler) {
synchronized (registeredHandlers) {
if (registeredHandlers.isEmpty()) {
/*
* This is the first handler that registers, we will initialize
* the agent.
*/
init();
}
registeredHandlers.add(handler);
}
}
@Override
public void unregisterHandler(T handler) {
synchronized (registeredHandlers) {
registeredHandlers.remove(handler);
if (registeredHandlers.isEmpty()) {
/* There are no more registered handlers, close the connection. */
dispose();
}
}
}
private void init() {
/*
* Only called from a synchronized (registeredHandlers) block, should
* not need additional synchronization.
*/
if (initialized) {
return;
}
LttngUstAgentLogger.log(AbstractLttngAgent.class, "Initializing Agent for domain: " + domain.name());
String rootClientThreadName = "Root sessiond client started by agent: " + this.getClass().getSimpleName();
rootSessiondClient = new LttngTcpSessiondClient(this, getDomain().value(), true);
rootSessiondClientThread = new Thread(rootSessiondClient, rootClientThreadName);
rootSessiondClientThread.setDaemon(true);
rootSessiondClientThread.start();
String userClientThreadName = "User sessiond client started by agent: " + this.getClass().getSimpleName();
userSessiondClient = new LttngTcpSessiondClient(this, getDomain().value(), false);
userSessiondClientThread = new Thread(userSessiondClient, userClientThreadName);
userSessiondClientThread.setDaemon(true);
userSessiondClientThread.start();
/* Give the threads' registration a chance to end. */
if (!rootSessiondClient.waitForConnection(INIT_TIMEOUT)) {
userSessiondClient.waitForConnection(INIT_TIMEOUT);
}
initialized = true;
}
/**
* Dispose the agent
*/
private void dispose() {
LttngUstAgentLogger.log(AbstractLttngAgent.class, "Disposing Agent for domain: " + domain.name());
/*
* Only called from a synchronized (registeredHandlers) block, should
* not need additional synchronization.
*/
rootSessiondClient.close();
userSessiondClient.close();
try {
rootSessiondClientThread.join();
userSessiondClientThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
rootSessiondClient = null;
rootSessiondClientThread = null;
userSessiondClient = null;
userSessiondClientThread = null;
/*
* Send filter change notifications for all event rules currently
* active, then clear them.
*/
FilterChangeNotifier fcn = FilterChangeNotifier.getInstance();
for (Map.Entry<String, Integer> entry : enabledEvents.entrySet()) {
String eventName = entry.getKey();
Integer nb = entry.getValue();
for (int i = 0; i < nb.intValue(); i++) {
fcn.removeEventRules(eventName);
}
}
enabledEvents.clear();
for (Map.Entry<String, Integer> entry : enabledEventPrefixes.entrySet()) {
/* Re-add the * at the end, the FCN tracks the rules that way */
String eventName = (entry.getKey() + "*");
Integer nb = entry.getValue();
for (int i = 0; i < nb.intValue(); i++) {
fcn.removeEventRules(eventName);
}
}
enabledEventPrefixes.clear();
int wildcardRules = enabledWildcards.getAndSet(0);
for (int i = 0; i < wildcardRules; i++) {
fcn.removeEventRules(WILDCARD);
}
/*
* Also clear tracked app contexts (no filter notifications sent for
* those currently).
*/
enabledAppContexts.clear();
initialized = false;
}
@Override
public boolean eventEnabled(EventRule eventRule) {
/* Notify the filter change manager of the command */
FilterChangeNotifier.getInstance().addEventRule(eventRule);
String eventName = eventRule.getEventName();
if (eventName.equals(WILDCARD)) {
enabledWildcards.incrementAndGet();
return true;
}
if (eventName.endsWith(WILDCARD)) {
/* Strip the "*" from the name. */
String prefix = eventName.substring(0, eventName.length() - 1);
return incrementRefCount(prefix, enabledEventPrefixes);
}
return incrementRefCount(eventName, enabledEvents);
}
@Override
public boolean eventDisabled(String eventName) {
/* Notify the filter change manager of the command */
FilterChangeNotifier.getInstance().removeEventRules(eventName);
if (eventName.equals(WILDCARD)) {
int newCount = enabledWildcards.decrementAndGet();
if (newCount < 0) {
/* Event was not enabled, bring the count back to 0 */
enabledWildcards.incrementAndGet();
return false;
}
return true;
}
if (eventName.endsWith(WILDCARD)) {
/* Strip the "*" from the name. */
String prefix = eventName.substring(0, eventName.length() - 1);
return decrementRefCount(prefix, enabledEventPrefixes);
}
return decrementRefCount(eventName, enabledEvents);
}
@Override
public boolean appContextEnabled(String contextRetrieverName, String contextName) {
synchronized (enabledAppContexts) {
Map<String, Integer> retrieverMap = enabledAppContexts.get(contextRetrieverName);
if (retrieverMap == null) {
/* There is no submap for this retriever, let's create one. */
retrieverMap = new ConcurrentHashMap<String, Integer>();
enabledAppContexts.put(contextRetrieverName, retrieverMap);
}
return incrementRefCount(contextName, retrieverMap);
}
}
@Override
public boolean appContextDisabled(String contextRetrieverName, String contextName) {
synchronized (enabledAppContexts) {
Map<String, Integer> retrieverMap = enabledAppContexts.get(contextRetrieverName);
if (retrieverMap == null) {
/* There was no submap for this retriever, invalid command? */
return false;
}
boolean ret = decrementRefCount(contextName, retrieverMap);
/* If the submap is now empty we can remove it from the main map. */
if (retrieverMap.isEmpty()) {
enabledAppContexts.remove(contextRetrieverName);
}
return ret;
}
}
/*
* Implementation of this method is domain-specific.
*/
@Override
public abstract Collection<String> listAvailableEvents();
@Override
public boolean isEventEnabled(String eventName) {
/* If at least one session enabled the "*" wildcard, send the event */
if (enabledWildcards.get() > 0) {
return true;
}
/* Check if at least one session wants this exact event name */
if (enabledEvents.containsKey(eventName)) {
return true;
}
/* Look in the enabled prefixes if one of them matches the event */
String potentialMatch = enabledEventPrefixes.floorKey(eventName);
if (potentialMatch != null && eventName.startsWith(potentialMatch)) {
return true;
}
return false;
}
@Override
public Collection<Map.Entry<String, Map<String, Integer>>> getEnabledAppContexts() {
return enabledAppContexts.entrySet();
}
private static boolean incrementRefCount(String key, Map<String, Integer> refCountMap) {
synchronized (refCountMap) {
Integer count = refCountMap.get(key);
if (count == null) {
/* This is the first instance of this event being enabled */
refCountMap.put(key, Integer.valueOf(1));
return true;
}
if (count.intValue() <= 0) {
/* It should not have been in the map in the first place! */
throw new IllegalStateException();
}
/* The event was already enabled, increment its refcount */
refCountMap.put(key, Integer.valueOf(count.intValue() + 1));
return true;
}
}
private static boolean decrementRefCount(String key, Map<String, Integer> refCountMap) {
synchronized (refCountMap) {
Integer count = refCountMap.get(key);
if (count == null || count.intValue() <= 0) {
/*
* The sessiond asked us to disable an event that was not
* enabled previously. Command error?
*/
return false;
}
if (count.intValue() == 1) {
/*
* This is the last instance of this event being disabled,
* remove it from the map so that we stop sending it.
*/
refCountMap.remove(key);
return true;
}
/*
* Other sessions are still looking for this event, simply decrement
* its refcount.
*/
refCountMap.put(key, Integer.valueOf(count.intValue() - 1));
return true;
}
}
}