#
# Copyright (c) 2020 Red Hat, Inc.
#
# This file is part of nmstate
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 2.1 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
import datetime
import logging
from libnmstate.error import NmstateInternalError
from libnmstate.error import NmstateTimeoutError
from .common import NM
from .common import GLib
from .common import Gio
# Interval for idle checker to check on whether timeout should trigger since
# last finish async action.
IDLE_CHECK_INTERNAL = 5
# libnm dbus connection has reply timeout 25 seconds.
IDLE_TIMEOUT = 25
# NetworkManage is using dbus in libnm while the dbus has limitation on
# maximum number of pending replies per connection.(RHEL/CentOS 8 is 1024)
# Hence limit the synchronous queue size
SLOW_ASYNC_QUEUE_SIZE = 100
FAST_ASYNC_QUEUE_SIZE = 300
class NmContext:
def __init__(self):
self._client = NM.Client.new(cancellable=None)
self._context = self._client.get_main_context()
self._quitting = False
self._cancellable = None
self._error = None
self._timeout_source = None
self._last_async_finish_time = None
self._fast_queue = None
self._slow_queue = None
self._init_queue()
self._init_cancellable()
def _init_queue(self):
self._fast_queue = set()
self._slow_queue = set()
def _init_cancellable(self):
self._cancellable = Gio.Cancellable.new()
@property
def cancellable(self):
return self._cancellable
@property
def client(self):
if self._quitting:
return None
return self._client
@property
def context(self):
if not self._context:
raise NmstateInternalError(
"BUG: Accessing MainContext while it is None"
)
return self._context
def refresh_content(self):
if self.context:
while self.context.iteration(False):
pass
def clean_up(self):
if self._cancellable:
self._cancellable.cancel()
self._del_timeout()
self._del_client()
self._context = None
self._cancellable = None
def _del_client(self):
if self._client:
is_done = []
is_timeout = []
self._client.get_context_busy_watcher().weak_ref(
lambda: is_done.append(1)
)
self._client = None
self._quitting = True
self.refresh_content()
if not is_done:
timeout_source = GLib.timeout_source_new(50)
try:
timeout_source.set_callback(lambda x: is_timeout.append(1))
timeout_source.attach(self.context)
while not is_done and not is_timeout:
self.context.iteration(True)
finally:
timeout_source.destroy()
if not is_done:
logging.error("BUG: NM.Client is not cleaned")
self._context = None
def _del_timeout(self):
if self._timeout_source:
self._timeout_source.destroy()
self._timeout_source = None
def register_async(self, action, fast=False):
"""
Register action(string) to wait list.
Set fast as True if requested action does not require too much time,
for example: profile modification.
"""
queue = self._fast_queue if fast else self._slow_queue
max_queue = FAST_ASYNC_QUEUE_SIZE if fast else SLOW_ASYNC_QUEUE_SIZE
if len(queue) >= max_queue:
logging.debug(
f"Async queue({max_queue}) full, waiting all existing actions "
"to be finished before registering more async action"
)
# TODO: No need to wait all finish, should continue when the queue
# is considerably empty and ready for new async action.
self.wait_all_finish()
if action in self._fast_queue or action in self._slow_queue:
raise NmstateInternalError(
f"BUG: An existing actions {action} is already registered"
)
logging.debug(f"Async action: {action} started")
queue.add(action)
def finish_async(self, action, suppress_log=False):
"""
Mark action(string) as finished.
"""
self._last_async_finish_time = datetime.datetime.now()
if not suppress_log:
logging.debug(f"Async action: {action} finished")
self._fast_queue.discard(action)
self._slow_queue.discard(action)
def _action_all_finished(self):
return not (len(self._fast_queue) or len(self._slow_queue))
def _idle_timeout_cb(self, _user_data):
if self._error or self._action_all_finished():
return GLib.SOURCE_REMOVE
idle_time = datetime.datetime.now() - self._last_async_finish_time
if idle_time > datetime.timedelta(seconds=IDLE_TIMEOUT):
remaining_actions = self._slow_queue | self._fast_queue
self.fail(
NmstateTimeoutError(f"Action {remaining_actions} timeout")
)
return GLib.SOURCE_REMOVE
else:
return GLib.SOURCE_CONTINUE
def is_cancelled(self):
return self._cancellable.is_cancelled()
def fail(self, exception):
if not self._cancellable.is_cancelled():
if self._error:
logging.error(
f"BUG: There is already a exception assigned: "
f"existing: {self._error}, new exception {exception}"
)
self.cancellable.cancel()
self._del_timeout()
self._error = exception
def wait_all_finish(self):
"""
Block till all async actions been marked as finished via
`finish_async()` or anyone failed by `fail()`.
"""
self._last_async_finish_time = datetime.datetime.now()
if not self._action_all_finished():
self._timeout_source = GLib.timeout_source_new(
IDLE_CHECK_INTERNAL * 1000
)
user_data = None
self._timeout_source.set_callback(self._idle_timeout_cb, user_data)
self._timeout_source.attach(self._context)
while not self._action_all_finished() and not self._error:
self.context.iteration(True)
self._del_timeout()
if self._error:
# The queue and error should be flush and perpare for another run
self._init_queue()
self._init_cancellable()
tmp_error = self._error
self._error = None
# pylint: disable=raising-bad-type
raise tmp_error
# pylint: enable=raising-bad-type
def get_nm_dev(self, iface_name):
return self.client.get_device_by_iface(iface_name)