通过一个示例形象地理解C# async await 非并行异步、并行异步、并行异步的并发量控制

前言
接上一篇 通过一个示例形象地理解C# async await异步
我在 .NET与大数据 中吐槽前同事在双层循环体中(肯定是单线程了)频繁请求es,导致接口的总耗时很长。这不能怪前同事,确实难写,会使代码复杂度增加。
评论区有人说他的理解是使用异步增加了系统吞吐能力,这个理解是正确的,但对于单个接口的单次请求而言,它是单线程的,耗时反而可能比同步还慢。如何缩短单个接口的单次请求的时间呢(要求:尽量不增加代码复杂度)?请看下文。
示例的测试步骤
先直接测试,看结果,下面再放代码
点击VS2022的启动按钮,启动程序,它会先启动Server工程,再启动AsyncAwaitDemo2工程
分别点击三个button
观察思考输出结果
测试截图
非并行异步(顺序执行的异步)

截图说明:单次请求耗时约0.5秒,共10次请求,耗时约 0.5秒×10=5秒
并行异步

截图说明:单次请求耗时约0.5秒,共10次请求,耗时约 0.5秒
并行异步(控制并发数量)

截图说明:单次请求耗时约0.5秒,共10次请求,并发数是5,耗时约 0.5秒×10÷5=1秒
服务端
服务端和客户端是两个独立的工程,测试时在一起跑,但其实可以分开部署,部署到不同的机器上
服务端是一个web api接口,用.NET 6、VS2022开发,代码如下:
[ApiController]
[Route("[controller]")]
public class TestController : ControllerBase
{
[HttpGet]
[Route("[action]")]
public async Task<Dictionary<int, int>> Get(int i)
{
var result = new Dictionary<int, int>();
await Task.Delay(500); //模拟耗时操作
if (i == 0)
{
result.Add(0, 5);
result.Add(1, 4);
result.Add(2, 3);
result.Add(3, 2);
result.Add(4, 1);
} else if (i == 1)
{
result.Add(0, 10);
result.Add(1, 9);
result.Add(2, 8);
result.Add(3, 7);
result.Add(4, 6);
} return result;
}
}客户端
大家看客户端代码时,不需要关心服务端怎么写
客户端是一个Winform工程,用.NET 6、VS2022开发,代码如下:
public partial class Form1 : Form
{
private readonly string _url = "http://localhost:5028/Test/Get";
public Form1()
{
InitializeComponent();
}
private async void Form1_Load(object sender, EventArgs e)
{ //预热
HttpClient httpClient = HttpClientFactory.GetClient();
await (await httpClient.GetAsync(_url)).Content.ReadAsStringAsync();
} //非并行异步(顺序执行的异步)
private async void button3_Click(object sender, EventArgs e)
{
await Task.Run(async () =>
{
Log($"==== 非并行异步 开始,线程ID={Thread.CurrentThread.ManagedThreadId} ========================");
Stopwatch sw = Stopwatch.StartNew();
HttpClient httpClient = HttpClientFactory.GetClient();
var tasks = new Dictionary<string, Task<string>>();
StringBuilder sb = new StringBuilder(); for (int i = 0; i < 2; i++)
{ int sum = 0; for (int j = 0; j < 5; j++)
{
Dictionary<int, int> dict = await RequestAsync(_url, i); if (dict.ContainsKey(j))
{ int num = dict[j];
sum += num;
sb.Append($"{num}, ");
}
}
Log($"输出:sum={sum}");
}
Log($"输出:{sb}");
sw.Stop();
Log($"==== 结束,线程ID={Thread.CurrentThread.ManagedThreadId},耗时:{sw.Elapsed.TotalSeconds:0.000}秒 ========================");
});
} // 并行异步
private async void button4_Click(object sender, EventArgs e)
{
await Task.Run(async () =>
{
Log($"==== 并行异步 开始,线程ID={Thread.CurrentThread.ManagedThreadId} ========================");
Stopwatch sw = Stopwatch.StartNew();
HttpClient httpClient = HttpClientFactory.GetClient();
var tasks = new Dictionary<string, Task<Dictionary<int, int>>>();
StringBuilder sb = new StringBuilder(); //双层循环写第一遍
for (int i = 0; i < 2; i++)
{ for (int j = 0; j < 5; j++)
{
var task = RequestAsync(_url, i);
tasks.Add($"{i}_{j}", task);
}
} //双层循环写第二遍
for (int i = 0; i < 2; i++)
{ int sum = 0; for (int j = 0; j < 5; j++)
{
Dictionary<int, int> dict = await tasks[$"{i}_{j}"]; if (dict.ContainsKey(j))
{ int num = dict[j];
sum += num;
sb.Append($"{num}, ");
}
}
Log($"输出:sum={sum}");
}
Log($"输出:{sb}");
sw.Stop();
Log($"==== 结束,线程ID={Thread.CurrentThread.ManagedThreadId},耗时:{sw.Elapsed.TotalSeconds:0.000}秒 ========================");
});
} // 并行异步(控制并发数量)
private async void button5_Click(object sender, EventArgs e)
{
await Task.Run(async () =>
{
Log($"==== 并行异步(控制并发数量) 开始,线程ID={Thread.CurrentThread.ManagedThreadId} ===================");
Stopwatch sw = Stopwatch.StartNew();
HttpClient httpClient = HttpClientFactory.GetClient();
var tasks = new Dictionary<string, Task<Dictionary<int, int>>>();
Semaphore sem = new Semaphore(5, 5);
StringBuilder sb = new StringBuilder(); //双层循环写第一遍
for (int i = 0; i < 2; i++)
{ for (int j = 0; j < 5; j++)
{
var task = RequestAsync(_url, i, sem);
tasks.Add($"{i}_{j}", task);
}
} //双层循环写第二遍
for (int i = 0; i < 2; i++)
{ int sum = 0; for (int j = 0; j < 5; j++)
{
Dictionary<int, int> dict = await tasks[$"{i}_{j}"]; if (dict.ContainsKey(j))
{ int num = dict[j];
sum += num;
sb.Append($"{num}, ");
}
}
Log($"输出:sum={sum}");
}
sem.Dispose(); //别忘了释放
Log($"输出:{sb}");
sw.Stop();
Log($"==== 结束,线程ID={Thread.CurrentThread.ManagedThreadId},耗时:{sw.Elapsed.TotalSeconds:0.000}秒 ========================");
});
}
private async Task<Dictionary<int, int>> RequestAsync(string url, int i)
{
Stopwatch sw = Stopwatch.StartNew();
HttpClient httpClient = HttpClientFactory.GetClient();
var result = await (await httpClient.GetAsync($"{url}?i={i}")).Content.ReadAsStringAsync();
sw.Stop();
Log($"线程ID={Thread.CurrentThread.ManagedThreadId},请求耗时:{sw.Elapsed.TotalSeconds:0.000}秒"); return JsonSerializer.Deserialize<Dictionary<int, int>>(result);
}
private async Task<Dictionary<int, int>> RequestAsync(string url, int i, Semaphore semaphore)
{
semaphore.WaitOne();
try
{
Stopwatch sw = Stopwatch.StartNew();
HttpClient httpClient = HttpClientFactory.GetClient();
var result = await (await httpClient.GetAsync($"{url}?i={i}")).Content.ReadAsStringAsync();
sw.Stop();
Log($"线程ID={Thread.CurrentThread.ManagedThreadId},请求耗时:{sw.Elapsed.TotalSeconds:0.000}秒"); return JsonSerializer.Deserialize<Dictionary<int, int>>(result);
}
catch (Exception ex)
{
Log($"错误:{ex}");
throw;
}
finally
{
semaphore.Release();
}
} #region Log
private void Log(string msg)
{
msg = $"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff")} {msg}\r\n"; if (this.InvokeRequired)
{
this.BeginInvoke(new Action(() =>
{
txtLog.AppendText(msg);
}));
} else
{
txtLog.AppendText(msg);
}
} #endregion
private void button6_Click(object sender, EventArgs e)
{
txtLog.Text = string.Empty;
}
}思考
1. 使用Semaphore的注意事项
如果是Winform程序,可以在button事件方法中定义它的局部变量。如果是WebAPI接口服务,请在接口方法中定义Semaphore的局部变量。注意,别定义成全局的,或者定义成静态的,或者定义成Controller的成员变量,那样会严重限制使用它的接口的吞吐能力!
用完调用Dispose释放
2. 尽量不增加代码复杂度
请思考代码中的注释"双层循环写第一遍""双层循环写第二遍",这个写法尽量不增加代码复杂度,试想一下,如果你用Task.Run,且不说占用线程,就问你怎么写能简单?
有人说,这题我会,这样写不就行了:
Dictionary<int, int>[] result = await Task.WhenAll(tasks.Values);
那请问,你接下来怎么写?我相信你肯定会写,但问题是,代码的逻辑结构变了,代码复杂度增加了!
所以"双层循环写第一遍""双层循环写第二遍"是什么意思?你即能方便合并,又能方便拆分,代码逻辑结构没变,只是复制了一份。
3. RequestAsync的复杂度可控
RequestAsync的复杂度
通过一个示例形象地理解C# async await 非并行异步、并行异步、并行异步的并发量控制
声明:除非特别标注,否则均为本站原创文章,转载时请以链接形式注明文章出处。如若本站内容侵犯了原著者的合法权益,可联系本站删除。



