最近在从Unity转UE的学习过程中,在学习了几天之后在此做个知识沉淀,希望能够比较理清UE的知识。
要了解UE的渲染模块最开始接触的就是多线程机制,照例个人还是喜欢多图模式来辅助理解。
知乎版链接:https://zhuanlan.zhihu.com/p/622013534
多线程基础
这部分基础讲解主要来自GAMES104,加上亿点点网上资料整理。
这里只整理了部分基础知识,要想系统性的了解多线程则需要学习操作系统和Linux内核。
首先是为什么要有多线程机制?随着现在集成电路的工艺和晶体管密度的提升,人类已经接近触及到量子力学的极限,单核的性能提升越发无力。既然不能把核越做越快,那么要想提升算力的另一个方法就是越做越多。
进程和线程
我们还需要搞清楚进程和线程是什么:进程和线程都是一个时间段的描述,这个时间段是CPU工作时间段。
在CPU看来,任务都是一个一个执行的,按照这样的方法执行:
- 加载进程A的上下文
- 执行进程A的任务
- 保存进程A的上下文
- 加载进程B的上下文
- …
那么一个进程,其实就是上面的123步,是对运行中的程序段的描述,也是对这一段CPU执行时间的描述。
那么线程是什么呢?上面一个进程的颗粒度太大,每次执行都要进行上下文的切换(耗时),一个进程的任务不可能只是一条逻辑执行的,必然会有多个分支和多个程序段,这时就可以把进程A分成a,b,c等多个块组成,那么这里具体的执行就可能变成:
- 加载进程A的上下文
- 开始进程A的任务
- 执行进程A的a小段
- 执行进程A的b小段
- 执行进程A的c小段
- 保存进程A的上下文
- …
这里的abc就是线程,也就是说线程是共享了进程的上下文的更小的CPU时间段,主要共享的是进程的地址空间。
总结:进程和线程都是一个CPU时间段的描述,只不过是颗粒大小不同,线程是依赖于进程内部的,是CPU执行任务的最小单位。
当然这里的概念只是辅助理解,与实际代码中的进程和线程有一些区别。
多任务处理
多任务操作系统分为两类:
- Preemptive Multitasking (抢占式),这种方式会有一个调度器 (Scheduler,一段用于调度进程的程序)来决定此时该执行哪些任务,如果是别的任务则会强制打断当前任务,大部分操作系统就是这种。
- Non-preempive Multitasking (非抢占式) 也可叫做 Cooperative,协作式。这种方式会让当前任务执行到自愿停下来,这个自愿停止的行为成为 yield。但是如果某个任务阻塞了,那么整个系统就阻塞了。
线程上下文切换
上面说进程之间有上下文读取和保存 (切换),其实线程之间也有。CPU在此主要做的任务就是将当前任务的状态保存下来,存到内存中的某个位置,然后加载下一个任务的状态,这个过程就叫做上下文切换。
每个线程都有一个程序计数器 (记录要执行的下一条指令),一组寄存器 (保存当前线程的工作变量),堆栈 (记录执行历史,保存执行了但未返回的过程),等等一些数据这些都是任务状态,都是需要保存和读取的,以下是一些辅助理解的图片。
很多人看到这里可能不理解线程和CPU核心是什么关系
在单核CPU中,操作系统可能会采用轮询的方式进行调度多线程,使多个线程看起来是同时运行的
在多核CPU中,线程则可能会安排在不同的CPU核心中同时运行,从而达到并行处理的目的
线程上下文切换会有什么问题呢?
上下文切换会导致额外的开销,甚至会出现并发执行的速度比串行还慢,因此减少上下文切换次数即可提高多线程任务的效率。
可能的消耗如下:
- CPU寄存器需要保存和加载
- 额外的用户态和内核态的切换
- CPU缓存失效
- …
Race Condition
在一个进程中允许有多个线程,而且共享进程的上下文等资源。那么在程序运行的过程中,进程内的一块数据块可能会被多个线程同时读写,这就会造成数据异常,从而导致不可预料的结果。这就造成了Race Condition
避免产生Race Condition的方法有很多,例如原子操作、临界区、读写锁、信号量、Barrier、Fence、Event等等,这里就不一一进行解释了,感兴趣的可以自行搜索。
FRunnable
简单讲完了一些多线程知识,接下来开始看UE中多线程是怎么实现的。
UE的多线程有三种方式,其中最基础的就是FRunnable,另外两个系统也是借助FRunnable来执行任务的,所以我们先来了解FRunnable。
好,打开UE源码!
class CORE_API FRunnable
{
public:
virtual bool Init(){ return true; } // 初始化,成功返回true
virtual uint32 Run() = 0; // 执行任务,被FRunnableThread调用
virtual void Stop() { } // 请求提前停止
virtual void Exit() { } // 退出,清理数据
virtual class FSingleThreadRunnable* GetSingleThreadInterface(){ return nullptr; }
virtual ~FRunnable() { }
};
class CORE_API FRunnableThread
{
public:
static FRunnableThread* Create(const FRunnable* InRunnable, ...);
virtual void Suspend(bool bShouldPause = true) = 0; // 暂停/继续 线程
virtual bool Kill(bool bShouldWait = true) = 0; // 销毁线程
FRunnableThread();
virtual ~FRunnableThread();
protected:
FRunnable* Runnable; // 执行体
private:
virtual void Tick() {} // FakeThread才会调用
};
这就是FRunnable系统的部分代码,其中FRunnable类是一个线程执行体的抽象概念,真正执行的操作系统线程则被封装在了FRunnableThread,其中静态方法Create就会调用不同平台的库去创建线程,一般来说一个平台都会实现一个FRunnableThread的派生类。
当Runnable被创建并初始化的时候,一般也会创建出FRunnableThread,然后会将FRunnableThread添加到FThreadManager中。如果是真正的多线程那么FRunnableThread会在创建的平台现场开始执行Run函数。如果是单线程平台创建出来的假多线程,那么则会被FThreadManager的Tick在主线程来调用Tick执行。当FRunnableThread的Run或者Tick被执行时,才会去调用FRunnable的Run,Run里就是我们自己写的任务了。
(注意FRunnableThread的Tick和Run两个函数,一个是单线程模式被FThreadManager调用的,一个是被平台线程执行的函数,这里以Windows平台举例)
一个线程的声明周期图:
AsyncTask
说完了UE多线程的"基建",下面来谈一个稍微封装了"亿点"的AsyncTask系统。
FQueuedThreadPool 线程池
在讲AsyncTask之前先讲UE的线程池,先上代码
// 线程池内的线程单位
class FQueuedThread : public FRunnable
{
protected:
FEvent* DoWorkEvent = nullptr; // 告诉线程有任务要执行
TAtomic<bool> TimeToDie { false }; // 线程该退出了
IQueuedWork* volatile QueuedWork = nullptr; // 线程正在执行的任务
class FQueuedThreadPoolBase* OwningThreadPool = nullptr; // 属于的线程池
FRunnableThread* Thread = nullptr; // 真正的线程
virtual uint32 Run() override; // 平台线程调用
public:
// 默认构造函数
FQueuedThread() = default;
// 初始化,创建线程
virtual bool Create(class FQueuedThreadPoolBase* InPool,uint32 InStackSize = 0, EThreadPriority ThreadPriority=TPri_Normal, const TCHAR* ThreadName = nullptr);
// 提前退出线程
bool KillThread();
// 执行线程池派来的任务,执行完后自行返回线程池
// 在任务添加进线程池后且当前线程可用,才会被线程池调用(AddQueuedWork里),正常是在Run函数里不断执行任务
void DoWork(IQueuedWork* InQueuedWork);
};
首先先看线程池内的线程长什么样,FQueueThread通过继承FRunnable来当作线程池内的线程单位。
线程执行任务有两种方式:
- 第一种是通过DoWork来执行,DoWork会在任务被加入到线程池时判断是否有线程空闲,如果有则调用该线程的DoWork来执行任务。
- 第二种是通过FRunnable的Run函数,这个函数内部是一个循环调用,通过ReturnToPoolOrGetNextJob来返回线程池或者获取下一个任务。
执行的具体任务都是通过调用IQueueWork的DoThreadedWork()来执行。
// 线程池的抽象类,实现在FQueueThreadPoolBase里
class CORE_API FQueuedThreadPool
{
public:
// 创建指定数量、栈大小和优先级的线程
virtual bool Create(uint32 InNumQueuedThreads, uint32 StackSize = (32 * 1024), EThreadPriority ThreadPriority = TPri_Normal, const TCHAR* Name = TEXT("UnknownThreadPool")) = 0;
// 清理所有后台线程
virtual void Destroy() = 0;
// 添加IQueueWork任务,如果有可用线程则立即执行,否则会排队
virtual void AddQueuedWork( IQueuedWork* InQueuedWork, EQueuedWorkPriority InQueuedWorkPriority = EQueuedWorkPriority::Normal) = 0;
// 收回之前指派的IQueueWork任务
virtual bool RetractQueuedWork(IQueuedWork* InQueuedWork) = 0;
// 获取池内线程数量
virtual int32 GetNumThreads() const = 0;
public:
FQueuedThreadPool();
virtual ~FQueuedThreadPool();
public:
// 创建线程池对象
static FQueuedThreadPool* Allocate();
// 重写栈大小
static uint32 OverrideStackSize;
};
class FQueuedThreadPoolBase : public FQueuedThreadPool
{
protected:
TArray<IQueuedWork*> QueuedWork; // 需要执行的任务列表
TArray<FQueuedThread*> QueuedThreads; // 线程池内的可用线程
TArray<FQueuedThread*> AllThreads; // 线程池内的所有线程
FCriticalSection* SynchQueue; // 同步临界区
bool TimeToDie; // 超时标记
public:
// ...这里主要是对FQueuedThreadPool里方法的实现
// 返回线程池或者获取下一个任务
IQueuedWork* ReturnToPoolOrGetNextJob(FQueuedThread* InQueuedThread);
};
再来看看线程池长什么样,线程池内会有一个待执行的任务列表(QueueWork)和一个可用线程列表(QueuedThreads),可用线程列表用来查询是否有空闲线程,任务列表则会不断被线程索取任务。
线程池是在引擎PreInit的时候就会进行初始化,而且会根据当前平台和环境决定开的线程数量等等。
IQueueWork和AsyncTask
上面线程池里线程执行的任务单位叫做IQueueWork
// 线程池内执行的任务单位
class IQueuedWork
{
public:
// 线程执行任务的地方
virtual void DoThreadedWork() = 0;
// 提前放弃该任务,清理数据
virtual void Abandon() = 0;
// 获取特殊的任务标记
virtual EQueuedWorkFlags GetQueuedWorkFlags() const { return EQueuedWorkFlags::None; }
// 返回任务执行中可能需要的最大内存大小
virtual int64 GetRequiredMemory() const { return -1 /* Negative value means unknown */; }
public:
virtual ~IQueuedWork() { }
}
可以看到里面就一个简单的执行和放弃任务的方法,而且这是一个接口,具体都是需要继承实现,UE给封装了比较通用的派生类就是FAsyncTask和FAutoDeleteAsyncTask
template<typename TTask>
class FAsyncTask : public FAsyncTaskBase
{
TTask Task;
// ...
// 尝试放弃任务
bool TryAbandonTask() final
{
if (Task.CanAbandon())
{
Task.Abandon();
return true;
}
return false;
}
// 执行Task
void DoTaskWork() final { Task.DoWork(); }
};
FAsyncTask是一个模板类,其中一些繁杂琐事都在FAsyncTaskBase里完成了,我们就来简单看一下FAsyncTask类就行:
- FAsyncTask是一个模板类,真正的TTask需要自己去写,在UE源码里有一大段注释里给了Task的模板,根据这个模板来改写TTask就行
- 使用FAsyncTask就默认要使用UE提供的线程池FQueuedThreadPool
- FAsyncTaskBase里有一个DoneEvent事件,任务完成后会Trigger这个事件,可以在某处调用EnsureCompletion来等待任务完成
- FAsyncTask可选择在当前线程或者新线程执行,通过StartSynchronousTask或StartBackgroundTask选择
FAutoDeleteAsyncTask与FAsyncTask类似,但有一些差异:
- 在任务执行完成后会通过线程池的Destroy函数删除自身;,或者在DoWork后删除自身,而FAsyncTask需要手动 delete(指针)
- 默认使用UE提供的线程池,但是可以通过参数指定使用其他线程
总的来说,FAsyncTask是UE封装的一个方便使用的多线程任务,还实现了线程池的功能,底层还是FRunnable。
TaskGraph
讲完AsyncTask,来看看UE最复杂的多线程系统,他为什么这么复杂呢,主要是因为它支持任务之间的依赖。想象一下要在一个多线程系统中实现任务之间的依赖,需要处理任务先后次序、同步和等待,还需要将效率提升上去,是有多么复杂。TaskGraph用的是DAG (Directed acyclic graph, 有向无环图) 来组织任务之间的执行顺序和依赖关系,还可以用图算法来优化。
FBaseGraphTask
先从TaskGraph中的任务讲起
class FBaseGraphTask
{
protected:
// 构造函数
FBaseGraphTask(int32 InNumberOfPrerequistitesOutstanding);
// 设置任务执行在哪个类型的线程
void SetThreadToExecuteOn(ENamedThreads::Type InThreadToExecuteOn);
// 前置任务完成,或者部分完成
void PrerequisitesComplete(ENamedThreads::Type CurrentThread, int32 NumAlreadyFinishedPrequistes, bool bUnlock = true);
// 带条件(前置任务完成)地执行任务
void ConditionalQueueTask(ENamedThreads::Type CurrentThread, bool& bWakeUpWorker);
// 析构函数
virtual ~FBaseGraphTask();
private:
// Subclass API
// 真正地执行任务,由子类实现
virtual void ExecuteTask(TArray<FBaseGraphTask*>& NewTasks, ENamedThreads::Type CurrentThread, bool bDeleteOnCompletion)=0;
// 清理数据
virtual void DeleteTask() = 0;
// 执行任务
FORCEINLINE void Execute(TArray<FBaseGraphTask*>& NewTasks, ENamedThreads::Type CurrentThread, bool bDeleteOnCompletion);
// Internal Use
// 将任务加入排队队列
void QueueTask(ENamedThreads::Type CurrentThreadIfKnown, bool bWakeUpWorker);
ENamedThreads::Type ThreadToExecuteOn; // 执行该任务地线程类型
FThreadSafeCounter NumberOfPrerequistitesOutstanding; // 前置任务的数量,降为0时执行该任务
};
任务的基类和IQueuedWork类似,都是开放了一些接口,主要实现是在它的子类TGraphTask。区别就是多了一些对前置任务的管理和判断。
template<typename TTask>
class TGraphTask final : public FBaseGraphTask
{
public:
// 构造任务的辅助类,任务构造器
class FConstructor
{
public:
// 构造TTask任务,设置TGraphTask数据,然后执行
template<typename...T>
FGraphEventRef ConstructAndDispatchWhenReady(T&&... Args);
// 构造TTask任务,设置TGraphTask数据,然后Hold住
template<typename...T>
TGraphTask* ConstructAndHold(T&&... Args);
private:
TGraphTask* Owner; // 构造器所在的TGraphTask
const FGraphEventArray* Prerequisites; // 前置任务
ENamedThreads::Type CurrentThreadIfKnown; // 当前线程或者为AnyThread
};
// 创建任务,返回的是构造器对象,以便对任务执行后续操作
static FConstructor CreateTask(const FGraphEventArray* Prerequisites = NULL, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread);
// 解锁任务(从Hold状态解锁)
void Unlock(ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread);
private:
// 实现执行任务
// 1.执行TTask
// 2.销毁TTask
// 3.执行后续任务
// 4.销毁this指针
void ExecuteTask(TArray<FBaseGraphTask*>& NewTasks, ENamedThreads::Type CurrentThread, bool bDeleteOnCompletion) override;
// 设置前置任务,在Setup里调用
// 1.创建任务完成事件
// 2.基于TTask设置要执行的线程
// 3.尝试在每个前置任务(Prerequisites是前置任务数组)的后续任务里添加自己
// 4.告诉基类,当前置任务完成后,就可以入队排队执行了
void SetupPrereqs(const FGraphEventArray* Prerequisites, ENamedThreads::Type CurrentThreadIfKnown, bool bUnlock);
// 设置任务,在FConstructor里调用,主要调用SetupPrereqs
FGraphEventRef Setup(const FGraphEventArray* Prerequisites = NULL, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread)
// 设置任务,在FConstructor里调用,和Setup的区别就是还不允许执行任务,Hold住
TGraphTask* Hold(const FGraphEventArray* Prerequisites = NULL, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread)
// 创建任务,主要用在前置任务完成后,执行后续任务时创建后续任务
static FConstructor CreateTask(FGraphEventRef SubsequentsToAssume, const FGraphEventArray* Prerequisites = NULL, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread)
TAlignedBytes<sizeof(TTask),alignof(TTask)> TaskStorage; // 自己写的TTask,数据对齐
bool TaskConstructed; // 任务是否已经被创建了
FGraphEventRef Subsequents; // 指向后续任务的引用计数指针,列出了以我为前置任务的任务
};
和AsyncTask一样,真正执行的TTask也是需要我们自己构建的,在源码里有给出模板可以自己改模板实现。
和AsyncTask不同的是,GraphTask里出现了一些对前置任务和后续任务的处理。包括在创建任务的时候设置自身的前置任务,并且将自身加到前置任务的后续任务引用计数指针里。在执行完当前任务后就需要通知后续任务:我已经执行完毕,请将你的前置任务数量减一,当减为0的时候请执行。
FGraphEvent
上面我们看到对于任务依赖我们用到了FGraphEventRef这样一个事件工具类,这个类其实就是UE封装的FGraphEvent的指针:typedef TRefCountPtr<class FGraphEvent> FGraphEventRef;
。这个类就是用来维护图中所有任务关系,用来传递任务完成状态的,如果某个任务完成了,就会将其完成的事件传给后续任务。
class FGraphEvent
{
public:
// 创建GraphEvent的工厂方法
static CORE_API FGraphEventRef CreateGraphEvent();
// 尝试为当前任务添加后续任务,如果该后续任务已经完成则返回false,防止任务图出现闭环
bool AddSubsequent(class FBaseGraphTask* Subsequent);
// 延迟当前任务,直到EventToWaitFor任务完成,只有这两个任务有关联时才合法
void DontCompleteUntil(FGraphEventRef EventToWaitFor)
// 执行后续任务,在TaskGraph中ExecuteTask第3步调用
CORE_API void DispatchSubsequents(ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread);
CORE_API void DispatchSubsequents(TArray<FBaseGraphTask*>& NewTasks, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread);
// 等待此任务执行完毕
void Wait(ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread);
private:
TClosableLockFreePointerListUnorderedSingleConsumer<FBaseGraphTask, 0> SubsequentList; // 后续任务列表,线程安全
FGraphEventArray EventsToWaitFor; // 前置任务列表,线程不安全
};
主要都是在GraphTask里来调用的事件工具。
FTaskThread
TaskGraph内的线程分为两种:
- 一种是NamedThread,这类线程又称为命名线程,是UE专门预留出来的带名字的线程,每个线程都有它专门的工作任务,目前支持的命名线程有
- StatsThread,性能统计线程
- RHIThread,RHI线程,主要用来提交RHI绘制命令
- AudioThread,音频线程
- GameThread,游戏线程,主线程
- ActualRenderingThread,渲染线程,主要处理网格体收集,裁剪等等任务
- 另一种是AnyThread,这是TaskGraph系统创建的一堆后台线程,用来处理通用任务,线程数量由当前平台决定。
统计线程和音频线程
FTaskThreadBase
FTaskThreadBase是TaskGraph内线程的基类,定义了一组设置、操作任务的接口,而且又能看到我们的老朋友FRunnable。
class FTaskThreadBase : public FRunnable, FSingleThreadRunnable
{
public:
// 构造函数
FTaskThreadBase();
// 设置一些线程的基础数据,从主线程调用
void Setup(ENamedThreads::Type InThreadId, uint32 InPerThreadIDTLSSlot, FWorkerThread* InOwnerWorker);
// ------以下从当前线程调用------
// 初始化
void InitializeForCurrentThread();
// 获取当前线程Id
ENamedThreads::Type GetThreadId() const;
// 用于NameThread的开始处理任务,直到退出
virtual void ProcessTasksUntilQuit(int32 QueueIndex) = 0;
virtual uint64 ProcessTasksUntilIdle(int32 QueueIndex);
// 从当前线程添加任务到队列,假设任务要执行的线程和当前线程相同
virtual void EnqueueFromThisThread(int32 QueueIndex, FBaseGraphTask* Task);
// ------以下从任何线程调用------
// 用与返回NamedThread或者关闭AnyThread
virtual void RequestQuit(int32 QueueIndex) = 0;
// 从其他线程添加任务到当前线程任务列表
virtual bool EnqueueFromOtherThread(int32 QueueIndex, FBaseGraphTask* Task);
// 唤醒线程
virtual void WakeUp(int32 QueueIndex = 0);
// 当前线程是否正在处理任务
virtual bool IsProcessingTasks(int32 QueueIndex) = 0;
// ------单线程API(来自FSingleThreadRunnable)------
// 初始化
virtual bool Init() override;
// 执行任务
virtual uint32 Run() override
// 提前退出
virtual void Stop() override
// 退出并清理数据
virtual void Exit() override
// 返回单线程接口
virtual FSingleThreadRunnable* GetSingleThreadInterface() override
protected:
ENamedThreads::Type ThreadId; // 当前线程的Id
uint32 PerThreadIDTLSSlot; // TLS(Thread Local Stage)插槽
FThreadSafeCounter IsStalled; // 阻塞计数器,用来触发阻塞信号,大部分时间同步不安全
TArray<FBaseGraphTask*> NewTasks; // 当前线程的任务列表
FWorkerThread* OwnerWorker; // 指向所属的WorkerThread
};
FTaskThreadBase只是抽象类,具体的实现由两个子类FNamedTaskThread和FTaskThreadAnyThread完成。
FNamedTaskThread
NamedThread是TaskGraph外面创建的线程 (引擎会在合适的时机自动创建,不需要手动),创建后可以通过Attach操作绑定到TaskGraph上。
NamedThread有一个优先级的概念,每个线程拥有两个任务队列,MainQueue和LocalQueue
class FNamedTaskThread : public FTaskThreadBase
{
public:
// NamedThread处理任务
virtual void ProcessTasksUntilQuit(int32 QueueIndex);
virtual uint64 ProcessTasksUntilIdle(int32 QueueIndex);
// 真正的处理任务
uint64 ProcessTasksNamedThread(int32 QueueIndex, bool bAllowStall);
// 一些FTaskThreadBase的方法实现......
virtual void EnqueueFromThisThread(int32 QueueIndex, FBaseGraphTask* Task);
virtual void RequestQuit(int32 QueueIndex);
virtual bool EnqueueFromOtherThread(int32 QueueIndex, FBaseGraphTask* Task);
virtual void WakeUp(int32 QueueIndex = 0);
virtual bool IsProcessingTasks(int32 QueueIndex) = 0;
private:
// 定义了一个任务队列
struct FThreadTaskQueue
{
FStallingTaskQueue<FBaseGraphTask, PLATFORM_CACHE_LINE_SIZE, 2> StallQueue; // 阻塞的任务队列
uint32 RecursionGuard; // 防止循环(递归)调用
bool QuitForReturn; // 是否执行了一个返回任务
bool QuitForShutdown; // 是否执行了一个退出任务
FEvent* StallRestartEvent; // 当前线程满载时的阻塞事件
FThreadTaskQueue();
~FThreadTaskQueue();
};
// 返回索引为QueueIndex的任务队列
FORCEINLINE FThreadTaskQueue& Queue(int32 QueueIndex);
FORCEINLINE const FThreadTaskQueue& Queue(int32 QueueIndex) const;
// 两个任务队列
FThreadTaskQueue Queues[ENamedThreads::NumQueues];
};
FTaskThreadAnyThread
AnyThread是在TaskGraph初始化得时候创建的工作线程,还包括了三种优先级:NormalThreadPriority,HighThreadPriority,BackgroundThreadPriority
AnyThread通常用来处理通用的多线程任务,所以创建的数量非常多,处理任务的方式也和NamedThread有所不同,主要就是通过FindWork从TaskGraph获取相应优先级的任务。
class FTaskThreadAnyThread : public FTaskThreadBase
{
public:
// 构造函数,并指定该线程优先级
FTaskThreadAnyThread(int32 InPriorityIndex): PriorityIndex(InPriorityIndex) {};
// AnyThread 处理任务
virtual void ProcessTasksUntilQuit(int32 QueueIndex);
virtual uint64 ProcessTasksUntilIdle(int32 QueueIndex);
// 一些FTaskThreadBase的方法实现......
virtual void RequestQuit(int32 QueueIndex);
virtual void WakeUp(int32 QueueIndex = 0);
virtual bool IsProcessingTasks(int32 QueueIndex) = 0;
private:
// 真正的处理任务
uint64 ProcessTasks();
// 定义了一个任务队列,与NamedThread的有些许不同
struct FThreadTaskQueue
{
FEvent* StallRestartEvent; // 做完工作得阻塞事件
uint32 RecursionGuard; // 防止循环(递归)调用
bool QuitForShutdown; // 是否执行了一个退出任务
bool bStallForTuning; // 是否需要暂停
FCriticalSection StallForTuning; // 阻塞临界区
FThreadTaskQueue()
~FThreadTaskQueue()
};
// 从TaskGraph系统中获取任务
FBaseGraphTask* FindWork()
{
return TaskGraphImplementationSingleton->FindWork(ThreadId);
}
FThreadTaskQueue Queue; // 任务队列
int32 PriorityIndex; // 优先级
};
TaskGraph的线程类型就只有这两种,处理特定任务用NamedThread,平时处理通用的多线程任务则可以用AnyThread。
FWorkerThread
我们还可以从上面线程的基类中看到一个结构体:FWorkerThread* OwnerWorker;
这是TaskGraph里真正操作的线程对象,只是简单的将FTaskThread对象(继承FRunnable)和FRunnableThread对象封装起来了。
struct FWorkerThread
{
FTaskThreadBase* TaskGraphWorker; // FTaskThread对象,有可能是NamedThread或者AnyThread
FRunnableThread* RunnableThread; // 真正执行任务的线程
bool bAttached; // 线程是否已经附加到TaskGraph上,主要用在NamedThread
FWorkerThread(): TaskGraphWorker(nullptr), RunnableThread(nullptr), bAttached(false) { }
};
TaskGraph System
看完了任务和线程,该看看本尊真面目了。
经典的写法就是整个系统分为FTaskGraphInterface和FTaskGraphImplementation两个部分,其中FTaskGraphInterface是一些父类接口,真正实现的部分则是在FTaskGraphImplementation里。
class FTaskGraphImplementation final : public FTaskGraphInterface
{
public:
// 单例
static FTaskGraphImplementation& Get();
// 构造函数
FTaskGraphImplementation(int32);
// 析构函数
virtual ~FTaskGraphImplementation();
// -------继承自FTaskGraphInterface------
// 入队任务
virtual void QueueTask(FBaseGraphTask* Task, bool bWakeUpWorker, ENamedThreads::Type ThreadToExecuteOn, ENamedThreads::Type InCurrentThreadIfKnown = ENamedThreads::AnyThread) final override;
// 获取各种线程数量
virtual int32 GetNumWorkerThreads() final override;
virtual int32 GetNumForegroundThreads() final override;
virtual int32 GetNumBackgroundThreads() final override;
// 获取当前线程信息
virtual bool IsCurrentThreadKnown() final override;
virtual ENamedThreads::Type GetCurrentThreadIfKnown(bool bLocalQueue) final override;
// 该线程是否正在运行任务
virtual bool IsThreadProcessingTasks(ENamedThreads::Type ThreadToCheck) final override;
// ------外部线程(主要是NamedThread)API-------
// 附加线程到TaskGraph系统中
virtual void AttachToThread(ENamedThreads::Type CurrentThread) final override;
// 处理任务
virtual uint64 ProcessThreadUntilIdle(ENamedThreads::Type CurrentThread) final override;
virtual void ProcessThreadUntilRequestReturn(ENamedThreads::Type CurrentThread) final override;
// 请求返回
virtual void RequestReturn(ENamedThreads::Type CurrentThread) final override;
// 等待任务执行完成
virtual void WaitUntilTasksComplete(const FGraphEventArray& Tasks, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread) final override;
virtual void TriggerEventWhenTasksComplete(FEvent* InEvent, const FGraphEventArray& Tasks, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread, ENamedThreads::Type TriggerThread = ENamedThreads::AnyHiPriThreadHiPriTask) final override;
// 添加一个停止的回调
virtual void AddShutdownCallback(TFunction<void()>& Callback);
// 唤醒NamedThread
virtual void WakeNamedThread(ENamedThreads::Type ThreadToWake) override;
// ------调度工具------
// 启动线程
void StartTaskThread(int32 Priority, int32 IndexToStart);
void StartAllTaskThreads(bool bDoBackgroundThreads);
// 返回一个任务给线程,从AnyThread线程调用
FBaseGraphTask* FindWork(ENamedThreads::Type ThreadInNeed) override;
// 暂停线程
void StallForTuning(int32 Index, bool Stall) override
// 设置FRunnableThread的优先级
void SetTaskThreadPriorities(EThreadPriority Pri)
private:
// 返回一个线程
FTaskThreadBase& Thread(int32 Index);
// 获取当前线程Type
ENamedThreads::Type GetCurrentThread()
// 获取线程的优先级
int32 ThreadIndexToPriorityIndex(int32 ThreadIndex)
FWorkerThread WorkerThreads[MAX_THREADS]; // 工作线程,封装了FTaskThread和FRunnableThread
int32 NumThreads; // 实际使用的线程数量
int32 NumNamedThreads; // 实际使用的NamedThread数量
int32 NumTaskThreadSets; // AnyThread线程集合数
int32 NumTaskThreadsPerSet; // 每个Anythread线程集合里包含的线程数
bool bCreatedHiPriorityThreads; // 是否有高优先级AnyThread
bool bCreatedBackgroundPriorityThreads; // 是否有后台AnyThread
ENamedThreads::Type LastExternalThread;
FThreadSafeCounter ReentrancyCheck;
uint32 PerThreadIDTLSSlot; // 每个线程的PLS插槽
TArray<TFunction<void()>> ShutdownCallbacks; // 停止的回调们
FStallingTaskQueue<FBaseGraphTask, PLATFORM_CACHE_LINE_SIZE, 2> IncomingAnyThreadTasks[MAX_THREAD_PRIORITIES]; // 任务列表,包含了NamedThread的也包含AnyThread的
};
既然FTaskGraphImplementation是继承的FTaskGraphInterface,那我们直接看FTaskGraphImplementation就行。
看这个庞大的类,其实主要是用来管理AnyThread和它的任务的,TaskGraph会根据线程优先级等等条件来创建FWorkerThread,然后入队任务时将任务Push到IncomingAnyThreadTasks,每个AnyThread都可以通过FindWork来索取任务进行处理。
下面是一些辅助理解的图片:(感谢大佬们的图)