Skip to content

Commit

Permalink
增加一种线程池,按顺序执行
Browse files Browse the repository at this point in the history
  • Loading branch information
walterlv committed Dec 15, 2021
1 parent 13631ca commit c404dcb
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 7 deletions.
11 changes: 8 additions & 3 deletions dotnetCampus.Ipc.sln
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 16
VisualStudioVersion = 16.0.31729.503
# Visual Studio Version 17
VisualStudioVersion = 17.1.31911.260
MinimumVisualStudioVersion = 15.0.26124.0
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "dotnetCampus.Ipc", "src\dotnetCampus.Ipc\dotnetCampus.Ipc.csproj", "{F7ED61F4-920C-49EB-8DC1-74B2BE6AF272}"
EndProject
Expand All @@ -19,10 +19,15 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "PipeMvc", "PipeMvc", "{716A
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "dotnetCampus.Ipc.PipeMvcServer", "src\PipeMvc\dotnetCampus.Ipc.PipeMvcServer\dotnetCampus.Ipc.PipeMvcServer.csproj", "{0F6B9C0C-7A64-4B21-BC28-E52565245A1A}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "dotnetCampus.Ipc.PipeMvcClient", "src\PipeMvc\dotnetCampus.Ipc.PipeMvcClient\dotnetCampus.Ipc.PipeMvcClient.csproj", "{1ACE3261-CC3D-4442-8C83-516721B3DA46}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "dotnetCampus.Ipc.PipeMvcClient", "src\PipeMvc\dotnetCampus.Ipc.PipeMvcClient\dotnetCampus.Ipc.PipeMvcClient.csproj", "{1ACE3261-CC3D-4442-8C83-516721B3DA46}"
EndProject
Project("{D954291E-2A0B-460D-934E-DC6B0785DB48}") = "dotnetCampus.Ipc.PipeMvcShare", "src\PipeMvc\dotnetCampus.Ipc.PipeMvcShare\dotnetCampus.Ipc.PipeMvcShare.shproj", "{05ACE3BB-61E5-4592-AC73-747850DB1081}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{11767A0B-CBFB-4323-BCEE-DE1CDD6C4B4B}"
ProjectSection(SolutionItems) = preProject
build\Version.props = build\Version.props
EndProjectSection
EndProject
Global
GlobalSection(SharedMSBuildProjectFiles) = preSolution
src\PipeMvc\dotnetCampus.Ipc.PipeMvcShare\dotnetCampus.Ipc.PipeMvcShare.projitems*{05ace3bb-61e5-4592-ac73-747850db1081}*SharedItemsImports = 13
Expand Down
6 changes: 6 additions & 0 deletions src/dotnetCampus.Ipc/Context/IpcConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Collections.Generic;

using dotnetCampus.Ipc.Internals;
using dotnetCampus.Ipc.Threading;
using dotnetCampus.Ipc.Utils.Buffers;
using dotnetCampus.Ipc.Utils.Logging;

Expand Down Expand Up @@ -32,6 +33,11 @@ public class IpcConfiguration
/// </summary>
public ISharedArrayPool SharedArrayPool { get; set; } = new SharedArrayPool();

/// <summary>
/// 决定如何调度 IPC 通知到业务的代码。
/// </summary>
public IpcTaskScheduling IpcTaskScheduling { get; set; }

/// <summary>
/// 为 IPC 记录日志。
/// </summary>
Expand Down
11 changes: 9 additions & 2 deletions src/dotnetCampus.Ipc/Context/IpcContext.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using dotnetCampus.Ipc.CompilerServices.GeneratedProxies;
using dotnetCampus.Ipc.Internals;
using dotnetCampus.Ipc.Pipes;
using dotnetCampus.Ipc.Threading;
using dotnetCampus.Ipc.Threading.Tasks;
using dotnetCampus.Ipc.Utils.Logging;

Expand All @@ -16,7 +17,7 @@ public class IpcContext
/// </summary>
public const string DefaultPipeName = "dotnet campus";

private static readonly IpcTask DefaultIpcTask = new();
private static readonly IpcTask DefaultIpcTask = new(new IpcThreadPool());

/// <summary>
/// 创建上下文
Expand All @@ -35,6 +36,12 @@ public IpcContext(IpcProvider ipcProvider, string pipeName, IpcConfiguration? ip
IpcConfiguration = ipcConfiguration ?? new IpcConfiguration();
GeneratedProxyJointIpcContext = new GeneratedProxyJointIpcContext(this);

TaskPool = IpcConfiguration.IpcTaskScheduling is IpcTaskScheduling.GlobalConcurrent
// 支持并发的 IPC 将共用同一个线程池。
? DefaultIpcTask
// 要求在同一线程调度的 IPC 将近似独享一个“线程”。
: new IpcTask(new IpcSingleThreadPool());

Logger = IpcConfiguration.IpcLoggerProvider?.Invoke(pipeName) ?? new IpcLogger(pipeName);
}

Expand Down Expand Up @@ -73,7 +80,7 @@ public override string ToString()
/// 2. 大多数为小型任务,但可能会出现一些难以预料到的长时间的任务;
/// 3. 不阻塞调用线程。
/// </summary>
internal IpcTask TaskPool { get; } = DefaultIpcTask;
internal IpcTask TaskPool { get; }

// 当前干掉回应的逻辑
///// <summary>
Expand Down
27 changes: 27 additions & 0 deletions src/dotnetCampus.Ipc/Threading/IIpcThreadPool.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
using System;
using System.Threading.Tasks;

using dotnetCampus.Ipc.Utils.Logging;

namespace dotnetCampus.Ipc.Threading
{
/// <summary>
/// 提供给 IPC 框架内部使用的线程池。
/// </summary>
internal interface IIpcThreadPool
{
/// <summary>
/// 在线程池挑选一个线程执行指定代码。
/// 当任务确定已经开始执行之后就会返回第一层 <see cref="Task"/>,
/// 在任务和超时时间先完成者会再次返回第二层 <see cref="Task"/>。
/// </summary>
/// <param name="action"></param>
/// <param name="logger"></param>
/// <returns></returns>
/// <remarks>
/// <para>特别注意!!!</para>
/// 此方法不是线程安全的,调用方必须确保此方法的调用处于临界区。
/// </remarks>
Task<Task> Run(Action action, ILogger? logger);
}
}
21 changes: 21 additions & 0 deletions src/dotnetCampus.Ipc/Threading/IpcSingleThreadPool.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
using System;
using System.Threading.Tasks;

using dotnetCampus.Ipc.Utils.Logging;

namespace dotnetCampus.Ipc.Threading
{
/// <summary>
/// 提供给 IPC 框架内部使用的线程池。
/// 所有的调度都将确定地按顺序执行,一个执行完毕后才会执行下一个。
/// </summary>
internal class IpcSingleThreadPool : IIpcThreadPool
{
public async Task<Task> Run(Action action, ILogger? logger)
{
// 因为此方法的调用方能保证依次执行而不并发,所以这里直接 Task.Run 也不会浪费线程。
await Task.Run(action).ConfigureAwait(false);
return Task.FromResult(0);
}
}
}
25 changes: 25 additions & 0 deletions src/dotnetCampus.Ipc/Threading/IpcTaskScheduling.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
namespace dotnetCampus.Ipc.Threading
{
/// <summary>
/// 决定 IPC 消息抵达时,以何种调度方式通知给业务代码。
/// </summary>
public enum IpcTaskScheduling
{
/// <summary>
/// 所有的 IPC 共享同一个调度线程池。以大体上按顺序(但不保证)的并发方式通知。
/// </summary>
GlobalConcurrent,

/// <summary>
/// 按顺序依次通知。
/// </summary>
/// <remarks>
/// <para>使用前请避免在业务中创造死锁条件:</para>
/// <list type="bullet">
/// <item>A 向 B 发送消息,但 B 为了回 A 的这条消息,需要先向 A 请求某种值。</item>
/// </list>
/// <para>在这种情况下,按顺序的发送方式将导致死锁。</para>
/// </remarks>
OneByOne,
}
}
2 changes: 1 addition & 1 deletion src/dotnetCampus.Ipc/Threading/IpcThreadPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace dotnetCampus.Ipc.Threading
/// 此类型的所有公共方法都不是线程安全的,因此你需要确保在临界区调用这些代码。
/// 但内部方法是线程安全的。
/// </summary>
internal sealed class IpcThreadPool
internal sealed class IpcThreadPool : IIpcThreadPool
{
/// <summary>
/// 为每一个创建的线程名称准备一个序号。
Expand Down
7 changes: 6 additions & 1 deletion src/dotnetCampus.Ipc/Threading/Tasks/IpcTask.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,19 @@ internal sealed class IpcTask
{
private volatile int _isRunning;
private readonly ConcurrentQueue<TaskItem> _queue = new();
private readonly IpcThreadPool _threadPool = new();
private readonly IIpcThreadPool _threadPool;

/// <summary>
/// 由于原子操作仅提供高性能的并发处理而不保证准确性,因此需要一个锁来同步 <see cref="_isRunning"/> 中值为 0 时所指的不确定情况。
/// 不能使用一个锁来同步所有情况是因为在锁中使用 async/await 是不安全的,因此避免在锁中执行异步任务;我们使用原子操作来判断异步任务的执行条件。
/// </summary>
private readonly object _locker = new();

public IpcTask(IIpcThreadPool threadPool)
{
_threadPool = threadPool;
}

/// <summary>
/// 支持并发进入的 IPC 任务。
/// 被此 <see cref="IpcTask"/> 管理的异步任务将按调用此方法的顺序依次开始执行,至于开始后多少个任务可以同时运行或者执行多长时间后算超时而执行下一个取决于此 <see cref="IpcTask"/> 的配置。
Expand Down

0 comments on commit c404dcb

Please sign in to comment.