Skip to content
Snippets Groups Projects
Commit 13310b86 authored by Patrick Williams's avatar Patrick Williams
Browse files

Update BackgroundExecutor to use LocklessQueue

This change allows BackgroundExecutor to receive callbacks from multiple
producers.

Bug: 283133164
Test: BackgroundExecutorTest
Change-Id: I63f3266184a2ae90b15d7d1b7d9502847e268763
parent 0f0d7b03
No related branches found
No related tags found
No related merge requests found
......@@ -28,29 +28,19 @@ namespace android {
ANDROID_SINGLETON_STATIC_INSTANCE(BackgroundExecutor);
BackgroundExecutor::BackgroundExecutor() : Singleton<BackgroundExecutor>() {
// mSemaphore must be initialized before any calls to
// BackgroundExecutor::sendCallbacks. For this reason, we initialize it
// within the constructor instead of within mThread.
LOG_ALWAYS_FATAL_IF(sem_init(&mSemaphore, 0, 0), "sem_init failed");
mThread = std::thread([&]() {
LOG_ALWAYS_FATAL_IF(sem_init(&mSemaphore, 0, 0), "sem_init failed");
while (!mDone) {
LOG_ALWAYS_FATAL_IF(sem_wait(&mSemaphore), "sem_wait failed (%d)", errno);
ftl::SmallVector<Work*, 10> workItems;
Work* work = mWorks.pop();
while (work) {
workItems.push_back(work);
work = mWorks.pop();
auto callbacks = mCallbacksQueue.pop();
if (!callbacks) {
continue;
}
// Sequence numbers are guaranteed to be in intended order, as we assume a single
// producer and single consumer.
std::stable_sort(workItems.begin(), workItems.end(), [](Work* left, Work* right) {
return left->sequence < right->sequence;
});
for (Work* work : workItems) {
for (auto& task : work->tasks) {
task();
}
delete work;
for (auto& callback : *callbacks) {
callback();
}
}
});
......@@ -66,12 +56,8 @@ BackgroundExecutor::~BackgroundExecutor() {
}
void BackgroundExecutor::sendCallbacks(Callbacks&& tasks) {
Work* work = new Work();
work->sequence = mSequence;
work->tasks = std::move(tasks);
mWorks.push(work);
mSequence++;
mCallbacksQueue.push(std::move(tasks));
LOG_ALWAYS_FATAL_IF(sem_post(&mSemaphore), "sem_post failed");
}
} // namespace android
\ No newline at end of file
} // namespace android
......@@ -16,15 +16,13 @@
#pragma once
#include <Tracing/LocklessStack.h>
#include <android-base/thread_annotations.h>
#include <ftl/small_vector.h>
#include <semaphore.h>
#include <utils/Singleton.h>
#include <mutex>
#include <queue>
#include <thread>
#include "LocklessQueue.h"
namespace android {
// Executes tasks off the main thread.
......@@ -34,24 +32,14 @@ public:
~BackgroundExecutor();
using Callbacks = ftl::SmallVector<std::function<void()>, 10>;
// Queues callbacks onto a work queue to be executed by a background thread.
// Note that this is not thread-safe - a single producer is assumed.
// This is safe to call from multiple threads.
void sendCallbacks(Callbacks&& tasks);
private:
sem_t mSemaphore;
std::atomic_bool mDone = false;
// Sequence number for work items.
// Work items are batched by sequence number. Work items for earlier sequence numbers are
// executed first. Work items with the same sequence number are executed in the same order they
// were added to the stack (meaning the stack must reverse the order after popping from the
// queue)
int32_t mSequence = 0;
struct Work {
int32_t sequence = 0;
Callbacks tasks;
};
LocklessStack<Work> mWorks;
LocklessQueue<Callbacks> mCallbacksQueue;
std::thread mThread;
};
......
......@@ -71,6 +71,7 @@ cc_test {
":libsurfaceflinger_sources",
"libsurfaceflinger_unittest_main.cpp",
"ActiveDisplayRotationFlagsTest.cpp",
"BackgroundExecutorTest.cpp",
"CompositionTest.cpp",
"DisplayIdGeneratorTest.cpp",
"DisplayTransactionTest.cpp",
......
#include <gtest/gtest.h>
#include <condition_variable>
#include "BackgroundExecutor.h"
namespace android {
class BackgroundExecutorTest : public testing::Test {};
namespace {
TEST_F(BackgroundExecutorTest, singleProducer) {
std::mutex mutex;
std::condition_variable condition_variable;
bool backgroundTaskComplete = false;
BackgroundExecutor::getInstance().sendCallbacks(
{[&mutex, &condition_variable, &backgroundTaskComplete]() {
std::lock_guard<std::mutex> lock{mutex};
condition_variable.notify_one();
backgroundTaskComplete = true;
}});
std::unique_lock<std::mutex> lock{mutex};
condition_variable.wait(lock, [&backgroundTaskComplete]() { return backgroundTaskComplete; });
ASSERT_TRUE(backgroundTaskComplete);
}
TEST_F(BackgroundExecutorTest, multipleProducers) {
std::mutex mutex;
std::condition_variable condition_variable;
const int backgroundTaskCount = 10;
int backgroundTaskCompleteCount = 0;
for (int i = 0; i < backgroundTaskCount; i++) {
std::thread([&mutex, &condition_variable, &backgroundTaskCompleteCount]() {
BackgroundExecutor::getInstance().sendCallbacks(
{[&mutex, &condition_variable, &backgroundTaskCompleteCount]() {
std::lock_guard<std::mutex> lock{mutex};
backgroundTaskCompleteCount++;
if (backgroundTaskCompleteCount == backgroundTaskCount) {
condition_variable.notify_one();
}
}});
}).detach();
}
std::unique_lock<std::mutex> lock{mutex};
condition_variable.wait(lock, [&backgroundTaskCompleteCount]() {
return backgroundTaskCompleteCount == backgroundTaskCount;
});
ASSERT_EQ(backgroundTaskCount, backgroundTaskCompleteCount);
}
} // namespace
} // namespace android
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment