Blob Blame History Raw
/*
 * Copyright (C) 2014 Intel Corporation. All rights reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#ifdef HAVE_CONFIG_H
#include "config.h"
#endif

#include "Thread.h"
#include "Functional.h"
#include "log.h"

#include <assert.h>

namespace YamiMediaCodec {

Thread::Thread(const char* name)
    : m_name(name)
    , m_started(false)
    , m_thread(INVALID_ID)
    , m_cond(m_lock)
    , m_sent(m_lock)
{
}

bool Thread::start()
{
    AutoLock lock(m_lock);
    if (m_started)
        return false;
    if (pthread_create(&m_thread, NULL, init, this)) {
        ERROR("create thread %s failed", m_name.c_str());
        m_thread = INVALID_ID;
        return false;
    }
    m_started = true;
    return true;
}

void* Thread::init(void* thread)
{
    Thread* t = (Thread*)thread;
    t->loop();
    return NULL;
}

void Thread::loop()
{
    while (1) {
        AutoLock lock(m_lock);
        if (m_queue.empty()) {
            if (!m_started)
                return;
            m_cond.wait();
        }
        else {
            Job& r = m_queue.front();
            m_lock.release();
            r();
            m_lock.acquire();
            m_queue.pop_front();
        }
    }
}

void Thread::enqueue(const Job& r)
{
    m_queue.push_back(r);
    m_cond.signal();
}

void Thread::post(const Job& r)
{
    AutoLock lock(m_lock);
    if (!m_started) {
        ERROR("%s: post job after stop()", m_name.c_str());
        return;
    }
    enqueue(r);
}

void Thread::sendJob(const Job& r, bool& flag)
{
    r();

    //flag need protect here since we check it in other thread
    AutoLock lock(m_lock);
    flag = true;
    m_sent.broadcast();
}

bool Thread::send(const Job& c)
{
    bool flag = false;

    AutoLock lock(m_lock);
    if (!m_started) {
        ERROR("%s: sent job after stop()", m_name.c_str());
        return false;
    }
    enqueue(std::bind(&Thread::sendJob, this, std::ref(c), std::ref(flag)));
    //wait for result;
    while (!flag) {
        m_sent.wait();
    }
    return true;
}

void Thread::stop()
{
    {
        AutoLock lock(m_lock);
        if (!m_started)
            return;
        m_started = false;
        m_cond.signal();
    }
    pthread_join(m_thread, NULL);
    m_thread = INVALID_ID;
    assert(m_queue.empty());
}

bool Thread::isCurrent()
{
    return pthread_self() == m_thread;
}

Thread::~Thread()
{
    stop();
}
}