电脑桌面
添加蚂蚁七词文库到电脑桌面
安装后可以在桌面快捷访问

通过一个示例形象地理解C# async await 非并行异步、并行异步、并行异步的并发量控制

来源:金蝶云社区作者:金蝶2024-09-162

通过一个示例形象地理解C# async await 非并行异步、并行异步、并行异步的并发量控制

前言

接上一篇 通过一个示例形象地理解C# async await异步
我在 .NET与大数据 中吐槽前同事在双层循环体中(肯定是单线程了)频繁请求es,导致接口的总耗时很长。这不能怪前同事,确实难写,会使代码复杂度增加。
评论区有人说他的理解是使用异步增加了系统吞吐能力,这个理解是正确的,但对于单个接口的单次请求而言,它是单线程的,耗时反而可能比同步还慢。如何缩短单个接口的单次请求的时间呢(要求:尽量不增加代码复杂度)?请看下文。

示例的测试步骤

先直接测试,看结果,下面再放代码

  1. 点击VS2022的启动按钮,启动程序,它会先启动Server工程,再启动AsyncAwaitDemo2工程

  2. 分别点击三个button

  3. 观察思考输出结果

测试截图

非并行异步(顺序执行的异步)


截图说明:单次请求耗时约0.5秒,共10次请求,耗时约 0.5秒×10=5秒

并行异步


截图说明:单次请求耗时约0.5秒,共10次请求,耗时约 0.5秒

并行异步(控制并发数量)


截图说明:单次请求耗时约0.5秒,共10次请求,并发数是5,耗时约 0.5秒×10÷5=1秒

服务端
服务端和客户端是两个独立的工程,测试时在一起跑,但其实可以分开部署,部署到不同的机器上
服务端是一个web api接口,用.NET 6、VS2022开发,代码如下:

[ApiController]
[Route("[controller]")]
public class TestController : ControllerBase
{
    [HttpGet]
    [Route("[action]")]
    public async Task<Dictionary<int, int>> Get(int i)
    {
        var result = new Dictionary<int, int>();

        await Task.Delay(500); //模拟耗时操作

        if (i == 0)
        {
            result.Add(0, 5);
            result.Add(1, 4);
            result.Add(2, 3);
            result.Add(3, 2);
            result.Add(4, 1);
        }        else if (i == 1)
        {
            result.Add(0, 10);
            result.Add(1, 9);
            result.Add(2, 8);
            result.Add(3, 7);
            result.Add(4, 6);
        }        return result;
    }
}

客户端
大家看客户端代码时,不需要关心服务端怎么写
客户端是一个Winform工程,用.NET 6、VS2022开发,代码如下:

public partial class Form1 : Form
{
    private readonly string _url = "http://localhost:5028/Test/Get";

    public Form1()
    {
        InitializeComponent();
    }

    private async void Form1_Load(object sender, EventArgs e)
    {        //预热
        HttpClient httpClient = HttpClientFactory.GetClient();
        await (await httpClient.GetAsync(_url)).Content.ReadAsStringAsync();
    }    //非并行异步(顺序执行的异步)
    private async void button3_Click(object sender, EventArgs e)
    {
        await Task.Run(async () =>
        {
            Log($"==== 非并行异步 开始,线程ID={Thread.CurrentThread.ManagedThreadId} ========================");
            Stopwatch sw = Stopwatch.StartNew();
            HttpClient httpClient = HttpClientFactory.GetClient();
            var tasks = new Dictionary<string, Task<string>>();
            StringBuilder sb = new StringBuilder();            for (int i = 0; i < 2; i++)
            {                int sum = 0;                for (int j = 0; j < 5; j++)
                {
                    Dictionary<int, int> dict = await RequestAsync(_url, i);                    if (dict.ContainsKey(j))
                    {                        int num = dict[j];
                        sum += num;
                        sb.Append($"{num}, ");
                    }
                }
                Log($"输出:sum={sum}");
            }
            Log($"输出:{sb}");
            sw.Stop();
            Log($"==== 结束,线程ID={Thread.CurrentThread.ManagedThreadId},耗时:{sw.Elapsed.TotalSeconds:0.000}秒 ========================");
        });
    }    // 并行异步
    private async void button4_Click(object sender, EventArgs e)
    {
        await Task.Run(async () =>
        {
            Log($"==== 并行异步 开始,线程ID={Thread.CurrentThread.ManagedThreadId} ========================");
            Stopwatch sw = Stopwatch.StartNew();
            HttpClient httpClient = HttpClientFactory.GetClient();
            var tasks = new Dictionary<string, Task<Dictionary<int, int>>>();
            StringBuilder sb = new StringBuilder();            //双层循环写第一遍
            for (int i = 0; i < 2; i++)
            {                for (int j = 0; j < 5; j++)
                {
                    var task = RequestAsync(_url, i);
                    tasks.Add($"{i}_{j}", task);
                }
            }            //双层循环写第二遍
            for (int i = 0; i < 2; i++)
            {                int sum = 0;                for (int j = 0; j < 5; j++)
                {
                    Dictionary<int, int> dict = await tasks[$"{i}_{j}"];                    if (dict.ContainsKey(j))
                    {                        int num = dict[j];
                        sum += num;
                        sb.Append($"{num}, ");
                    }
                }
                Log($"输出:sum={sum}");
            }
            Log($"输出:{sb}");
            sw.Stop();
            Log($"==== 结束,线程ID={Thread.CurrentThread.ManagedThreadId},耗时:{sw.Elapsed.TotalSeconds:0.000}秒 ========================");
        });
    }    // 并行异步(控制并发数量)
    private async void button5_Click(object sender, EventArgs e)
    {
        await Task.Run(async () =>
        {
            Log($"==== 并行异步(控制并发数量) 开始,线程ID={Thread.CurrentThread.ManagedThreadId} ===================");
            Stopwatch sw = Stopwatch.StartNew();
            HttpClient httpClient = HttpClientFactory.GetClient();
            var tasks = new Dictionary<string, Task<Dictionary<int, int>>>();
            Semaphore sem = new Semaphore(5, 5);
            StringBuilder sb = new StringBuilder();            //双层循环写第一遍
            for (int i = 0; i < 2; i++)
            {                for (int j = 0; j < 5; j++)
                {
                    var task = RequestAsync(_url, i, sem);
                    tasks.Add($"{i}_{j}", task);
                }
            }            //双层循环写第二遍
            for (int i = 0; i < 2; i++)
            {                int sum = 0;                for (int j = 0; j < 5; j++)
                {
                    Dictionary<int, int> dict = await tasks[$"{i}_{j}"];                    if (dict.ContainsKey(j))
                    {                        int num = dict[j];
                        sum += num;
                        sb.Append($"{num}, ");
                    }
                }
                Log($"输出:sum={sum}");
            }
            sem.Dispose(); //别忘了释放
            Log($"输出:{sb}");
            sw.Stop();
            Log($"==== 结束,线程ID={Thread.CurrentThread.ManagedThreadId},耗时:{sw.Elapsed.TotalSeconds:0.000}秒 ========================");
        });
    }

    private async Task<Dictionary<int, int>> RequestAsync(string url, int i)
    {
        Stopwatch sw = Stopwatch.StartNew();
        HttpClient httpClient = HttpClientFactory.GetClient();
        var result = await (await httpClient.GetAsync($"{url}?i={i}")).Content.ReadAsStringAsync();
        sw.Stop();
        Log($"线程ID={Thread.CurrentThread.ManagedThreadId},请求耗时:{sw.Elapsed.TotalSeconds:0.000}秒");        return JsonSerializer.Deserialize<Dictionary<int, int>>(result);
    }

    private async Task<Dictionary<int, int>> RequestAsync(string url, int i, Semaphore semaphore)
    {
        semaphore.WaitOne();
        try
        {
            Stopwatch sw = Stopwatch.StartNew();
            HttpClient httpClient = HttpClientFactory.GetClient();
            var result = await (await httpClient.GetAsync($"{url}?i={i}")).Content.ReadAsStringAsync();
            sw.Stop();
            Log($"线程ID={Thread.CurrentThread.ManagedThreadId},请求耗时:{sw.Elapsed.TotalSeconds:0.000}秒");            return JsonSerializer.Deserialize<Dictionary<int, int>>(result);
        }
        catch (Exception ex)
        {
            Log($"错误:{ex}");
            throw;
        }
        finally
        {
            semaphore.Release();
        }
    }    #region Log
    private void Log(string msg)
    {
        msg = $"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff")}  {msg}\r\n";        if (this.InvokeRequired)
        {
            this.BeginInvoke(new Action(() =>
            {
                txtLog.AppendText(msg);
            }));
        }        else
        {
            txtLog.AppendText(msg);
        }
    }    #endregion

    private void button6_Click(object sender, EventArgs e)
    {
        txtLog.Text = string.Empty;
    }
}

思考

1. 使用Semaphore的注意事项

  1. 如果是Winform程序,可以在button事件方法中定义它的局部变量。如果是WebAPI接口服务,请在接口方法中定义Semaphore的局部变量。注意,别定义成全局的,或者定义成静态的,或者定义成Controller的成员变量,那样会严重限制使用它的接口的吞吐能力!

  2. 用完调用Dispose释放

2. 尽量不增加代码复杂度

请思考代码中的注释"双层循环写第一遍""双层循环写第二遍",这个写法尽量不增加代码复杂度,试想一下,如果你用Task.Run,且不说占用线程,就问你怎么写能简单?
有人说,这题我会,这样写不就行了:

Dictionary<int, int>[] result = await Task.WhenAll(tasks.Values);

那请问,你接下来怎么写?我相信你肯定会写,但问题是,代码的逻辑结构变了,代码复杂度增加了!
所以"双层循环写第一遍""双层循环写第二遍"是什么意思?你即能方便合并,又能方便拆分,代码逻辑结构没变,只是复制了一份。

3. RequestAsync的复杂度可控

RequestAsync的复杂度

通过一个示例形象地理解C# async await 非并行异步、并行异步、并行异步的并发量控制

前言接上一篇 通过一个示例形象地理解C# async await异步我在 .NET与大数据 中吐槽前同事在双层循环体中(肯定是单线程了)频繁请求es,导...
点击下载文档文档为doc格式

声明:除非特别标注,否则均为本站原创文章,转载时请以链接形式注明文章出处。如若本站内容侵犯了原著者的合法权益,可联系本站删除。

已经是第一篇
确认删除?
回到顶部
客服QQ
  • 客服QQ点击这里给我发消息
QQ群
  • 答案:my7c点击这里加入QQ群
支持邮箱
微信
  • 微信