Blob Blame History Raw
# -*- coding: utf-8 -*-

#  Copyright (C) 2014 - Garrett Regier
#
#  This program is free software; you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation; either version 2 of the License, or
#  (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  You should have received a copy of the GNU General Public License
#  along with this program; if not, write to the Free Software
#  Foundation, Inc.  51 Franklin Street, Fifth Floor, Boston, MA
#  02110-1301 USA.

from gi.repository import GLib

import abc
import collections
import queue
import threading
import traceback

from .debug import debug


class WorkerThread(threading.Thread):
    __metaclass__ = abc.ABCMeta

    __sentinel = object()

    def __init__(self, callback, chunk_size=1, *args, **kwargs):
        super().__init__(*args, **kwargs)

        self.__callback = callback
        self.__chunk_size = chunk_size

        self.__quit = threading.Event()
        self.__has_idle = threading.Event()

        self.__tasks = queue.Queue()
        self.__results = collections.deque()

    @abc.abstractmethod
    def handle_task(self, *args, **kwargs):
        raise NotImplementedError

    # TODO: add, put, push?
    def push(self, *args, **kwargs):
        self.__tasks.put((args, kwargs))

    def __close(self, process_results):
        self.__quit.set()

        # Prevent the queue.get() from blocking forever
        self.__tasks.put(self.__sentinel)

        super().join()

        if not process_results:
            self.__results.clear()

        else:
            while self.__in_idle() is GLib.SOURCE_CONTINUE:
                pass

    def terminate(self):
        self.__close(False)

    def join(self):
        self.__close(True)

    def clear(self):
        old_tasks = self.__tasks
        self.__tasks = queue.Queue(1)

        # Prevent the queue.get() from blocking forever
        old_tasks.put(self.__sentinel)

        # Block until the old queue has finished, otherwise
        # a old result could be added to the new results queue
        self.__tasks.put(self.__sentinel)
        self.__tasks.put(self.__sentinel)

        old_tasks = self.__tasks
        self.__tasks = queue.Queue()

        # Switch to the new queue
        old_tasks.put(self.__sentinel)

        # Finally, we can now create a new deque without
        # the possibility of any old results being added to it
        self.__results.clear()

    def run(self):
        while not self.__quit.is_set():
            task = self.__tasks.get()
            if task is self.__sentinel:
                continue

            args, kwargs = task

            try:
                result = self.handle_task(*args, **kwargs)

            except Exception:
                traceback.print_exc()
                continue

            self.__results.append(result)

            # Avoid having an idle for every result
            if not self.__has_idle.is_set():
                self.__has_idle.set()

                debug('%s<%s>: result callback idle started' %
                      (type(self).__name__, self.name))
                GLib.source_set_name_by_id(GLib.idle_add(self.__in_idle),
                                           '[gedit] git %s result callback idle' %
                                           (type(self).__name__,))

    def __in_idle(self):
        try:
            for i in range(self.__chunk_size):
                result = self.__results.popleft()

                try:
                    self.__callback(result)

                except Exception:
                    traceback.print_exc()

        except IndexError:
            # Must be cleared before we check the results length
            self.__has_idle.clear()

            # Only remove the idle when there are no more items,
            # some could have been added after the IndexError was raised
            if len(self.__results) == 0:
                debug('%s<%s>: result callback idle finished' %
                      (type(self).__name__, self.name))
                return GLib.SOURCE_REMOVE

        return GLib.SOURCE_CONTINUE

# ex:ts=4:et: