UE多线程机制

Kyrie
发布于

最近在从Unity转UE的学习过程中,在学习了几天之后在此做个知识沉淀,希望能够比较理清UE的知识。
要了解UE的渲染模块最开始接触的就是多线程机制,照例个人还是喜欢多图模式来辅助理解。
知乎版链接:https://zhuanlan.zhihu.com/p/622013534

多线程基础

这部分基础讲解主要来自GAMES104,加上亿点点网上资料整理。
这里只整理了部分基础知识,要想系统性的了解多线程则需要学习操作系统和Linux内核。

首先是为什么要有多线程机制?随着现在集成电路的工艺和晶体管密度的提升,人类已经接近触及到量子力学的极限,单核的性能提升越发无力。既然不能把核越做越快,那么要想提升算力的另一个方法就是越做越多。

进程和线程


我们还需要搞清楚进程和线程是什么:进程和线程都是一个时间段的描述,这个时间段是CPU工作时间段。
在CPU看来,任务都是一个一个执行的,按照这样的方法执行:

  1. 加载进程A的上下文
  2. 执行进程A的任务
  3. 保存进程A的上下文
  4. 加载进程B的上下文

那么一个进程,其实就是上面的123步,是对运行中的程序段的描述,也是对这一段CPU执行时间的描述。
那么线程是什么呢?上面一个进程的颗粒度太大,每次执行都要进行上下文的切换(耗时),一个进程的任务不可能只是一条逻辑执行的,必然会有多个分支和多个程序段,这时就可以把进程A分成a,b,c等多个块组成,那么这里具体的执行就可能变成:

  1. 加载进程A的上下文
  2. 开始进程A的任务
    1. 执行进程A的a小段
    2. 执行进程A的b小段
    3. 执行进程A的c小段
  3. 保存进程A的上下文

这里的abc就是线程,也就是说线程是共享了进程的上下文的更小的CPU时间段,主要共享的是进程的地址空间。
总结:进程和线程都是一个CPU时间段的描述,只不过是颗粒大小不同,线程是依赖于进程内部的,是CPU执行任务的最小单位。
当然这里的概念只是辅助理解,与实际代码中的进程和线程有一些区别。

多任务处理


多任务操作系统分为两类:

  1. Preemptive Multitasking (抢占式),这种方式会有一个调度器 (Scheduler,一段用于调度进程的程序)来决定此时该执行哪些任务,如果是别的任务则会强制打断当前任务,大部分操作系统就是这种。
  2. Non-preempive Multitasking (非抢占式) 也可叫做 Cooperative,协作式。这种方式会让当前任务执行到自愿停下来,这个自愿停止的行为成为 yield。但是如果某个任务阻塞了,那么整个系统就阻塞了。

线程上下文切换


上面说进程之间有上下文读取和保存 (切换),其实线程之间也有。CPU在此主要做的任务就是将当前任务的状态保存下来,存到内存中的某个位置,然后加载下一个任务的状态,这个过程就叫做上下文切换。
每个线程都有一个程序计数器 (记录要执行的下一条指令),一组寄存器 (保存当前线程的工作变量),堆栈 (记录执行历史,保存执行了但未返回的过程),等等一些数据这些都是任务状态,都是需要保存和读取的,以下是一些辅助理解的图片。

很多人看到这里可能不理解线程和CPU核心是什么关系
在单核CPU中,操作系统可能会采用轮询的方式进行调度多线程,使多个线程看起来是同时运行的

在多核CPU中,线程则可能会安排在不同的CPU核心中同时运行,从而达到并行处理的目的

线程上下文切换会有什么问题呢?
上下文切换会导致额外的开销,甚至会出现并发执行的速度比串行还慢,因此减少上下文切换次数即可提高多线程任务的效率。
可能的消耗如下:

  • CPU寄存器需要保存和加载
  • 额外的用户态和内核态的切换
  • CPU缓存失效

Race Condition

在一个进程中允许有多个线程,而且共享进程的上下文等资源。那么在程序运行的过程中,进程内的一块数据块可能会被多个线程同时读写,这就会造成数据异常,从而导致不可预料的结果。这就造成了Race Condition

避免产生Race Condition的方法有很多,例如原子操作、临界区、读写锁、信号量、Barrier、Fence、Event等等,这里就不一一进行解释了,感兴趣的可以自行搜索。

FRunnable

简单讲完了一些多线程知识,接下来开始看UE中多线程是怎么实现的。
UE的多线程有三种方式,其中最基础的就是FRunnable,另外两个系统也是借助FRunnable来执行任务的,所以我们先来了解FRunnable。

好,打开UE源码!

class CORE_API FRunnable
{
public:
    virtual bool Init(){ return true; }	// 初始化,成功返回true
    virtual uint32 Run() = 0; // 执行任务,被FRunnableThread调用
    virtual void Stop() { }	// 请求提前停止
    virtual void Exit() { }	// 退出,清理数据
    virtual class FSingleThreadRunnable* GetSingleThreadInterface(){ return nullptr; }
    virtual ~FRunnable() { }
};

class CORE_API FRunnableThread
{
public:
	static FRunnableThread* Create(const FRunnable* InRunnable, ...);
    virtual void Suspend(bool bShouldPause = true) = 0; // 暂停/继续 线程
	virtual bool Kill(bool bShouldWait = true) = 0; // 销毁线程
	FRunnableThread();
	virtual ~FRunnableThread();
protected:
	FRunnable* Runnable; // 执行体
private:
	virtual void Tick() {} // FakeThread才会调用
};

这就是FRunnable系统的部分代码,其中FRunnable类是一个线程执行体的抽象概念,真正执行的操作系统线程则被封装在了FRunnableThread,其中静态方法Create就会调用不同平台的库去创建线程,一般来说一个平台都会实现一个FRunnableThread的派生类。
当Runnable被创建并初始化的时候,一般也会创建出FRunnableThread,然后会将FRunnableThread添加到FThreadManager中。如果是真正的多线程那么FRunnableThread会在创建的平台现场开始执行Run函数。如果是单线程平台创建出来的假多线程,那么则会被FThreadManager的Tick在主线程来调用Tick执行。当FRunnableThread的Run或者Tick被执行时,才会去调用FRunnable的Run,Run里就是我们自己写的任务了。
(注意FRunnableThread的Tick和Run两个函数,一个是单线程模式被FThreadManager调用的,一个是被平台线程执行的函数,这里以Windows平台举例)

一个线程的声明周期图:

AsyncTask

说完了UE多线程的"基建",下面来谈一个稍微封装了"亿点"的AsyncTask系统。

FQueuedThreadPool 线程池

在讲AsyncTask之前先讲UE的线程池,先上代码

// 线程池内的线程单位
class FQueuedThread : public FRunnable
{
protected:
	FEvent* DoWorkEvent = nullptr; // 告诉线程有任务要执行
	TAtomic<bool> TimeToDie { false }; // 线程该退出了
	IQueuedWork* volatile QueuedWork = nullptr; // 线程正在执行的任务
	class FQueuedThreadPoolBase* OwningThreadPool = nullptr; // 属于的线程池
	FRunnableThread* Thread = nullptr; // 真正的线程
	virtual uint32 Run() override; // 平台线程调用
public:
	// 默认构造函数
	FQueuedThread() = default;
	// 初始化,创建线程
	virtual bool Create(class FQueuedThreadPoolBase* InPool,uint32 InStackSize = 0, EThreadPriority ThreadPriority=TPri_Normal, const TCHAR* ThreadName = nullptr);
	// 提前退出线程
	bool KillThread();
	// 执行线程池派来的任务,执行完后自行返回线程池
	// 在任务添加进线程池后且当前线程可用,才会被线程池调用(AddQueuedWork里),正常是在Run函数里不断执行任务
	void DoWork(IQueuedWork* InQueuedWork);
};

首先先看线程池内的线程长什么样,FQueueThread通过继承FRunnable来当作线程池内的线程单位。
线程执行任务有两种方式:

  1. 第一种是通过DoWork来执行,DoWork会在任务被加入到线程池时判断是否有线程空闲,如果有则调用该线程的DoWork来执行任务。
  2. 第二种是通过FRunnable的Run函数,这个函数内部是一个循环调用,通过ReturnToPoolOrGetNextJob来返回线程池或者获取下一个任务。

执行的具体任务都是通过调用IQueueWork的DoThreadedWork()来执行。

// 线程池的抽象类,实现在FQueueThreadPoolBase里
class CORE_API FQueuedThreadPool
{
public:
	// 创建指定数量、栈大小和优先级的线程
	virtual bool Create(uint32 InNumQueuedThreads, uint32 StackSize = (32 * 1024), EThreadPriority ThreadPriority = TPri_Normal, const TCHAR* Name = TEXT("UnknownThreadPool")) = 0;
	// 清理所有后台线程
	virtual void Destroy() = 0;
	// 添加IQueueWork任务,如果有可用线程则立即执行,否则会排队
	virtual void AddQueuedWork( IQueuedWork* InQueuedWork, EQueuedWorkPriority InQueuedWorkPriority = EQueuedWorkPriority::Normal) = 0;
	// 收回之前指派的IQueueWork任务
	virtual bool RetractQueuedWork(IQueuedWork* InQueuedWork) = 0;
	// 获取池内线程数量
	virtual int32 GetNumThreads() const = 0;

public:
    FQueuedThreadPool();
	virtual	~FQueuedThreadPool();

public:
	// 创建线程池对象
	static FQueuedThreadPool* Allocate();
	// 重写栈大小
	static uint32 OverrideStackSize;
};

class FQueuedThreadPoolBase : public FQueuedThreadPool
{
protected:
    TArray<IQueuedWork*> QueuedWork; // 需要执行的任务列表
    TArray<FQueuedThread*> QueuedThreads; // 线程池内的可用线程
    TArray<FQueuedThread*> AllThreads;    // 线程池内的所有线程
    FCriticalSection* SynchQueue; // 同步临界区
    bool TimeToDie; // 超时标记

public:
    // ...这里主要是对FQueuedThreadPool里方法的实现

	// 返回线程池或者获取下一个任务
	IQueuedWork* ReturnToPoolOrGetNextJob(FQueuedThread* InQueuedThread);
};

再来看看线程池长什么样,线程池内会有一个待执行的任务列表(QueueWork)和一个可用线程列表(QueuedThreads),可用线程列表用来查询是否有空闲线程,任务列表则会不断被线程索取任务。
线程池是在引擎PreInit的时候就会进行初始化,而且会根据当前平台和环境决定开的线程数量等等。

IQueueWork和AsyncTask

上面线程池里线程执行的任务单位叫做IQueueWork

// 线程池内执行的任务单位
class IQueuedWork
{
public:
	// 线程执行任务的地方
	virtual void DoThreadedWork() = 0;
	// 提前放弃该任务,清理数据
	virtual void Abandon() = 0;
	// 获取特殊的任务标记
	virtual EQueuedWorkFlags GetQueuedWorkFlags() const { return EQueuedWorkFlags::None; }
	// 返回任务执行中可能需要的最大内存大小
	virtual int64 GetRequiredMemory() const { return -1 /* Negative value means unknown */; }
public:
	virtual ~IQueuedWork() { }
}

可以看到里面就一个简单的执行和放弃任务的方法,而且这是一个接口,具体都是需要继承实现,UE给封装了比较通用的派生类就是FAsyncTask和FAutoDeleteAsyncTask

template<typename TTask>
class FAsyncTask : public FAsyncTaskBase
{
	TTask Task;

	// ...

	// 尝试放弃任务
	bool TryAbandonTask() final
	{
		if (Task.CanAbandon())
		{
			Task.Abandon();
			return true;
		}
		return false;
	}

	// 执行Task
	void DoTaskWork() final { Task.DoWork(); }
};

FAsyncTask是一个模板类,其中一些繁杂琐事都在FAsyncTaskBase里完成了,我们就来简单看一下FAsyncTask类就行:

  1. FAsyncTask是一个模板类,真正的TTask需要自己去写,在UE源码里有一大段注释里给了Task的模板,根据这个模板来改写TTask就行
  2. 使用FAsyncTask就默认要使用UE提供的线程池FQueuedThreadPool
  3. FAsyncTaskBase里有一个DoneEvent事件,任务完成后会Trigger这个事件,可以在某处调用EnsureCompletion来等待任务完成
  4. FAsyncTask可选择在当前线程或者新线程执行,通过StartSynchronousTask或StartBackgroundTask选择

FAutoDeleteAsyncTask与FAsyncTask类似,但有一些差异:

  1. 在任务执行完成后会通过线程池的Destroy函数删除自身;,或者在DoWork后删除自身,而FAsyncTask需要手动 delete(指针)
  2. 默认使用UE提供的线程池,但是可以通过参数指定使用其他线程

总的来说,FAsyncTask是UE封装的一个方便使用的多线程任务,还实现了线程池的功能,底层还是FRunnable。

TaskGraph

讲完AsyncTask,来看看UE最复杂的多线程系统,他为什么这么复杂呢,主要是因为它支持任务之间的依赖。想象一下要在一个多线程系统中实现任务之间的依赖,需要处理任务先后次序、同步和等待,还需要将效率提升上去,是有多么复杂。TaskGraph用的是DAG (Directed acyclic graph, 有向无环图) 来组织任务之间的执行顺序和依赖关系,还可以用图算法来优化。

FBaseGraphTask

先从TaskGraph中的任务讲起

class FBaseGraphTask
{
protected:
	// 构造函数
	FBaseGraphTask(int32 InNumberOfPrerequistitesOutstanding);
	// 设置任务执行在哪个类型的线程
	void SetThreadToExecuteOn(ENamedThreads::Type InThreadToExecuteOn);
	// 前置任务完成,或者部分完成
	void PrerequisitesComplete(ENamedThreads::Type CurrentThread, int32 NumAlreadyFinishedPrequistes, bool bUnlock = true);
	// 带条件(前置任务完成)地执行任务
	void ConditionalQueueTask(ENamedThreads::Type CurrentThread, bool& bWakeUpWorker);
	// 析构函数
	virtual ~FBaseGraphTask();
private:
	// Subclass API
	// 真正地执行任务,由子类实现
	virtual void ExecuteTask(TArray<FBaseGraphTask*>& NewTasks, ENamedThreads::Type CurrentThread, bool bDeleteOnCompletion)=0;
	// 清理数据
	virtual void DeleteTask() = 0;
	// 执行任务
	FORCEINLINE void Execute(TArray<FBaseGraphTask*>& NewTasks, ENamedThreads::Type CurrentThread, bool bDeleteOnCompletion);

	// Internal Use
	// 将任务加入排队队列
	void QueueTask(ENamedThreads::Type CurrentThreadIfKnown, bool bWakeUpWorker);
	ENamedThreads::Type	ThreadToExecuteOn; // 执行该任务地线程类型
	FThreadSafeCounter NumberOfPrerequistitesOutstanding; // 前置任务的数量,降为0时执行该任务
};

任务的基类和IQueuedWork类似,都是开放了一些接口,主要实现是在它的子类TGraphTask。区别就是多了一些对前置任务的管理和判断。

template<typename TTask>
class TGraphTask final : public FBaseGraphTask
{
public:
	// 构造任务的辅助类,任务构造器
	class FConstructor
	{
	public:
		// 构造TTask任务,设置TGraphTask数据,然后执行
		template<typename...T>
		FGraphEventRef ConstructAndDispatchWhenReady(T&&... Args);
		// 构造TTask任务,设置TGraphTask数据,然后Hold住
		template<typename...T>
		TGraphTask* ConstructAndHold(T&&... Args);
	private:
		TGraphTask*	Owner; // 构造器所在的TGraphTask
		const FGraphEventArray*	Prerequisites; // 前置任务
		ENamedThreads::Type	CurrentThreadIfKnown; // 当前线程或者为AnyThread
	};

	// 创建任务,返回的是构造器对象,以便对任务执行后续操作
	static FConstructor CreateTask(const FGraphEventArray* Prerequisites = NULL, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread);
	// 解锁任务(从Hold状态解锁)
	void Unlock(ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread);

private:
	// 实现执行任务
	// 1.执行TTask 
	// 2.销毁TTask 
	// 3.执行后续任务 
	// 4.销毁this指针
	void ExecuteTask(TArray<FBaseGraphTask*>& NewTasks, ENamedThreads::Type CurrentThread, bool bDeleteOnCompletion) override;

	// 设置前置任务,在Setup里调用
	// 1.创建任务完成事件 
	// 2.基于TTask设置要执行的线程
	// 3.尝试在每个前置任务(Prerequisites是前置任务数组)的后续任务里添加自己
	// 4.告诉基类,当前置任务完成后,就可以入队排队执行了
	void SetupPrereqs(const FGraphEventArray* Prerequisites, ENamedThreads::Type CurrentThreadIfKnown, bool bUnlock);

	// 设置任务,在FConstructor里调用,主要调用SetupPrereqs
	FGraphEventRef Setup(const FGraphEventArray* Prerequisites = NULL, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread)
	// 设置任务,在FConstructor里调用,和Setup的区别就是还不允许执行任务,Hold住
	TGraphTask* Hold(const FGraphEventArray* Prerequisites = NULL, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread)
	// 创建任务,主要用在前置任务完成后,执行后续任务时创建后续任务
	static FConstructor CreateTask(FGraphEventRef SubsequentsToAssume, const FGraphEventArray* Prerequisites = NULL, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread)

	TAlignedBytes<sizeof(TTask),alignof(TTask)> TaskStorage; // 自己写的TTask,数据对齐
	bool TaskConstructed; // 任务是否已经被创建了
	FGraphEventRef Subsequents; // 指向后续任务的引用计数指针,列出了以我为前置任务的任务
};

和AsyncTask一样,真正执行的TTask也是需要我们自己构建的,在源码里有给出模板可以自己改模板实现。
和AsyncTask不同的是,GraphTask里出现了一些对前置任务和后续任务的处理。包括在创建任务的时候设置自身的前置任务,并且将自身加到前置任务的后续任务引用计数指针里。在执行完当前任务后就需要通知后续任务:我已经执行完毕,请将你的前置任务数量减一,当减为0的时候请执行。

FGraphEvent

上面我们看到对于任务依赖我们用到了FGraphEventRef这样一个事件工具类,这个类其实就是UE封装的FGraphEvent的指针:typedef TRefCountPtr<class FGraphEvent> FGraphEventRef;。这个类就是用来维护图中所有任务关系,用来传递任务完成状态的,如果某个任务完成了,就会将其完成的事件传给后续任务。

class FGraphEvent 
{
public:
	// 创建GraphEvent的工厂方法
	static CORE_API FGraphEventRef CreateGraphEvent();
	// 尝试为当前任务添加后续任务,如果该后续任务已经完成则返回false,防止任务图出现闭环
	bool AddSubsequent(class FBaseGraphTask* Subsequent)// 延迟当前任务,直到EventToWaitFor任务完成,只有这两个任务有关联时才合法
	void DontCompleteUntil(FGraphEventRef EventToWaitFor)
	// 执行后续任务,在TaskGraph中ExecuteTask第3步调用
	CORE_API void DispatchSubsequents(ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread);
	CORE_API void DispatchSubsequents(TArray<FBaseGraphTask*>& NewTasks, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread);
	// 等待此任务执行完毕
	void Wait(ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread);

private:
	TClosableLockFreePointerListUnorderedSingleConsumer<FBaseGraphTask, 0> SubsequentList; // 后续任务列表,线程安全
	FGraphEventArray EventsToWaitFor; // 前置任务列表,线程不安全
};

主要都是在GraphTask里来调用的事件工具。

FTaskThread

TaskGraph内的线程分为两种:

  • 一种是NamedThread,这类线程又称为命名线程,是UE专门预留出来的带名字的线程,每个线程都有它专门的工作任务,目前支持的命名线程有
    • StatsThread,性能统计线程
    • RHIThread,RHI线程,主要用来提交RHI绘制命令
    • AudioThread,音频线程
    • GameThread,游戏线程,主线程
    • ActualRenderingThread,渲染线程,主要处理网格体收集,裁剪等等任务
  • 另一种是AnyThread,这是TaskGraph系统创建的一堆后台线程,用来处理通用任务,线程数量由当前平台决定。

统计线程和音频线程

FTaskThreadBase

FTaskThreadBase是TaskGraph内线程的基类,定义了一组设置、操作任务的接口,而且又能看到我们的老朋友FRunnable。

class FTaskThreadBase : public FRunnable, FSingleThreadRunnable
{
public:
	// 构造函数
	FTaskThreadBase();
	// 设置一些线程的基础数据,从主线程调用
	void Setup(ENamedThreads::Type InThreadId, uint32 InPerThreadIDTLSSlot, FWorkerThread* InOwnerWorker);

	// ------以下从当前线程调用------
	// 初始化
	void InitializeForCurrentThread();
	// 获取当前线程Id
	ENamedThreads::Type GetThreadId() const;
	// 用于NameThread的开始处理任务,直到退出
	virtual void ProcessTasksUntilQuit(int32 QueueIndex) = 0;
	virtual uint64 ProcessTasksUntilIdle(int32 QueueIndex);
	// 从当前线程添加任务到队列,假设任务要执行的线程和当前线程相同
	virtual void EnqueueFromThisThread(int32 QueueIndex, FBaseGraphTask* Task);

	// ------以下从任何线程调用------
	// 用与返回NamedThread或者关闭AnyThread
	virtual void RequestQuit(int32 QueueIndex) = 0;
	// 从其他线程添加任务到当前线程任务列表
	virtual bool EnqueueFromOtherThread(int32 QueueIndex, FBaseGraphTask* Task);
	// 唤醒线程
	virtual void WakeUp(int32 QueueIndex = 0);
	// 	当前线程是否正在处理任务
	virtual bool IsProcessingTasks(int32 QueueIndex) = 0;

	// ------单线程API(来自FSingleThreadRunnable)------
	// 初始化
	virtual bool Init() override;
	// 执行任务
	virtual uint32 Run() override
	// 提前退出
	virtual void Stop() override
	// 退出并清理数据
	virtual void Exit() override
	// 返回单线程接口
	virtual FSingleThreadRunnable* GetSingleThreadInterface() override

protected:
	ENamedThreads::Type	ThreadId; // 当前线程的Id
	uint32 PerThreadIDTLSSlot; // TLS(Thread Local Stage)插槽
	FThreadSafeCounter IsStalled; // 阻塞计数器,用来触发阻塞信号,大部分时间同步不安全
	TArray<FBaseGraphTask*> NewTasks; // 当前线程的任务列表
	FWorkerThread* OwnerWorker; // 指向所属的WorkerThread
};

FTaskThreadBase只是抽象类,具体的实现由两个子类FNamedTaskThread和FTaskThreadAnyThread完成。

FNamedTaskThread

NamedThread是TaskGraph外面创建的线程 (引擎会在合适的时机自动创建,不需要手动),创建后可以通过Attach操作绑定到TaskGraph上。
NamedThread有一个优先级的概念,每个线程拥有两个任务队列,MainQueue和LocalQueue

class FNamedTaskThread : public FTaskThreadBase
{
public:
	// NamedThread处理任务
	virtual void ProcessTasksUntilQuit(int32 QueueIndex);
	virtual uint64 ProcessTasksUntilIdle(int32 QueueIndex);
	// 真正的处理任务
	uint64 ProcessTasksNamedThread(int32 QueueIndex, bool bAllowStall);
	// 一些FTaskThreadBase的方法实现......
	virtual void EnqueueFromThisThread(int32 QueueIndex, FBaseGraphTask* Task);
	virtual void RequestQuit(int32 QueueIndex);
	virtual bool EnqueueFromOtherThread(int32 QueueIndex, FBaseGraphTask* Task);
	virtual void WakeUp(int32 QueueIndex = 0);
	virtual bool IsProcessingTasks(int32 QueueIndex) = 0;

private:
	// 定义了一个任务队列
	struct FThreadTaskQueue
	{
		FStallingTaskQueue<FBaseGraphTask, PLATFORM_CACHE_LINE_SIZE, 2> StallQueue;	// 阻塞的任务队列
		uint32 RecursionGuard; // 防止循环(递归)调用
		bool QuitForReturn; // 是否执行了一个返回任务
		bool QuitForShutdown; // 是否执行了一个退出任务
		FEvent*	StallRestartEvent; // 当前线程满载时的阻塞事件

		FThreadTaskQueue();
		~FThreadTaskQueue();
	};

	// 返回索引为QueueIndex的任务队列
	FORCEINLINE FThreadTaskQueue& Queue(int32 QueueIndex);
	FORCEINLINE const FThreadTaskQueue& Queue(int32 QueueIndex) const;
	// 两个任务队列
	FThreadTaskQueue Queues[ENamedThreads::NumQueues];
};

FTaskThreadAnyThread

AnyThread是在TaskGraph初始化得时候创建的工作线程,还包括了三种优先级:NormalThreadPriority,HighThreadPriority,BackgroundThreadPriority
AnyThread通常用来处理通用的多线程任务,所以创建的数量非常多,处理任务的方式也和NamedThread有所不同,主要就是通过FindWork从TaskGraph获取相应优先级的任务。

class FTaskThreadAnyThread : public FTaskThreadBase
{
public:
	// 构造函数,并指定该线程优先级
	FTaskThreadAnyThread(int32 InPriorityIndex): PriorityIndex(InPriorityIndex) {};
	// AnyThread 处理任务
	virtual void ProcessTasksUntilQuit(int32 QueueIndex);
	virtual uint64 ProcessTasksUntilIdle(int32 QueueIndex);
	// 一些FTaskThreadBase的方法实现......
	virtual void RequestQuit(int32 QueueIndex);
	virtual void WakeUp(int32 QueueIndex = 0);
	virtual bool IsProcessingTasks(int32 QueueIndex) = 0;

private:
	// 真正的处理任务
	uint64 ProcessTasks();

	// 定义了一个任务队列,与NamedThread的有些许不同
	struct FThreadTaskQueue
	{
		FEvent* StallRestartEvent; // 做完工作得阻塞事件
		uint32 RecursionGuard; // 防止循环(递归)调用
		bool QuitForShutdown; // 是否执行了一个退出任务
		bool bStallForTuning; // 是否需要暂停
		FCriticalSection StallForTuning; // 阻塞临界区

		FThreadTaskQueue()
		~FThreadTaskQueue()
	};

	// 从TaskGraph系统中获取任务
	FBaseGraphTask* FindWork()
	{
		return TaskGraphImplementationSingleton->FindWork(ThreadId);
	}

	FThreadTaskQueue Queue; // 任务队列
	int32 PriorityIndex; // 优先级
};

TaskGraph的线程类型就只有这两种,处理特定任务用NamedThread,平时处理通用的多线程任务则可以用AnyThread。

FWorkerThread

我们还可以从上面线程的基类中看到一个结构体:FWorkerThread* OwnerWorker;
这是TaskGraph里真正操作的线程对象,只是简单的将FTaskThread对象(继承FRunnable)和FRunnableThread对象封装起来了。

struct FWorkerThread
{
	FTaskThreadBase*	TaskGraphWorker; // FTaskThread对象,有可能是NamedThread或者AnyThread
	FRunnableThread*	RunnableThread; // 真正执行任务的线程
	bool				bAttached; // 线程是否已经附加到TaskGraph上,主要用在NamedThread

	FWorkerThread(): TaskGraphWorker(nullptr), RunnableThread(nullptr), bAttached(false) { }
};

TaskGraph System

看完了任务和线程,该看看本尊真面目了。
经典的写法就是整个系统分为FTaskGraphInterface和FTaskGraphImplementation两个部分,其中FTaskGraphInterface是一些父类接口,真正实现的部分则是在FTaskGraphImplementation里。

class FTaskGraphImplementation final : public FTaskGraphInterface
{
public:
	// 单例
	static FTaskGraphImplementation& Get();
	// 构造函数
	FTaskGraphImplementation(int32);
	// 析构函数
	virtual ~FTaskGraphImplementation();

	// -------继承自FTaskGraphInterface------
	// 入队任务
	virtual void QueueTask(FBaseGraphTask* Task, bool bWakeUpWorker, ENamedThreads::Type ThreadToExecuteOn, ENamedThreads::Type InCurrentThreadIfKnown = ENamedThreads::AnyThread) final override;
	// 获取各种线程数量
	virtual int32 GetNumWorkerThreads() final override;
	virtual int32 GetNumForegroundThreads() final override;
	virtual int32 GetNumBackgroundThreads() final override;
	// 获取当前线程信息
	virtual bool IsCurrentThreadKnown() final override;
	virtual ENamedThreads::Type GetCurrentThreadIfKnown(bool bLocalQueue) final override;
	// 该线程是否正在运行任务
	virtual bool IsThreadProcessingTasks(ENamedThreads::Type ThreadToCheck) final override;

	// ------外部线程(主要是NamedThread)API-------
	// 附加线程到TaskGraph系统中
	virtual void AttachToThread(ENamedThreads::Type CurrentThread) final override;
	// 处理任务
	virtual uint64 ProcessThreadUntilIdle(ENamedThreads::Type CurrentThread) final override;
	virtual void ProcessThreadUntilRequestReturn(ENamedThreads::Type CurrentThread) final override;
	// 请求返回
	virtual void RequestReturn(ENamedThreads::Type CurrentThread) final override;
	// 等待任务执行完成
	virtual void WaitUntilTasksComplete(const FGraphEventArray& Tasks, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread) final override;
	virtual void TriggerEventWhenTasksComplete(FEvent* InEvent, const FGraphEventArray& Tasks, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread, ENamedThreads::Type TriggerThread = ENamedThreads::AnyHiPriThreadHiPriTask) final override;
	// 添加一个停止的回调
	virtual void AddShutdownCallback(TFunction<void()>& Callback);
	// 唤醒NamedThread
	virtual void WakeNamedThread(ENamedThreads::Type ThreadToWake) override;

	// ------调度工具------
	// 启动线程
	void StartTaskThread(int32 Priority, int32 IndexToStart);
	void StartAllTaskThreads(bool bDoBackgroundThreads);
	// 返回一个任务给线程,从AnyThread线程调用
	FBaseGraphTask* FindWork(ENamedThreads::Type ThreadInNeed) override;
	// 暂停线程
	void StallForTuning(int32 Index, bool Stall) override
	// 设置FRunnableThread的优先级
	void SetTaskThreadPriorities(EThreadPriority Pri)

private:
	// 返回一个线程
	FTaskThreadBase& Thread(int32 Index);
	// 获取当前线程Type
	ENamedThreads::Type GetCurrentThread()
	// 获取线程的优先级
	int32 ThreadIndexToPriorityIndex(int32 ThreadIndex)

	FWorkerThread WorkerThreads[MAX_THREADS]; // 工作线程,封装了FTaskThread和FRunnableThread
	int32 NumThreads; // 实际使用的线程数量
	int32 NumNamedThreads; // 实际使用的NamedThread数量
	int32 NumTaskThreadSets; // AnyThread线程集合数
	int32 NumTaskThreadsPerSet; // 每个Anythread线程集合里包含的线程数
	bool bCreatedHiPriorityThreads; // 是否有高优先级AnyThread
	bool bCreatedBackgroundPriorityThreads; // 是否有后台AnyThread
	ENamedThreads::Type LastExternalThread;
	FThreadSafeCounter ReentrancyCheck;
	uint32 PerThreadIDTLSSlot; // 每个线程的PLS插槽
	TArray<TFunction<void()>> ShutdownCallbacks; // 停止的回调们
	FStallingTaskQueue<FBaseGraphTask, PLATFORM_CACHE_LINE_SIZE, 2> IncomingAnyThreadTasks[MAX_THREAD_PRIORITIES]; // 任务列表,包含了NamedThread的也包含AnyThread的
};

既然FTaskGraphImplementation是继承的FTaskGraphInterface,那我们直接看FTaskGraphImplementation就行。
看这个庞大的类,其实主要是用来管理AnyThread和它的任务的,TaskGraph会根据线程优先级等等条件来创建FWorkerThread,然后入队任务时将任务Push到IncomingAnyThreadTasks,每个AnyThread都可以通过FindWork来索取任务进行处理。
下面是一些辅助理解的图片:(感谢大佬们的图)

Reference

  1. 进程和线程的区别
  2. 剖析虚幻渲染体系(02)- 多线程渲染
  3. 面向数据编程与任务系统 | GAMES104-现代游戏引擎:从入门到实践
  4. UE4异步编程专题 - 多线程
  5. 《Exploring in UE4》多线程机制详解
  6. UE并发-TaskGraph的实现和用法
9
评论 2
收藏 4
  • MR7
    MR7
    感谢分享长文!我们会努力改进写作体验~
    展开1条回复