为了账号安全,请及时绑定邮箱和手机立即绑定

如何提高迭代 130 多个项目并将其上传到 aws s3 的性能

如何提高迭代 130 多个项目并将其上传到 aws s3 的性能

手掌心 2022-09-21 17:39:13

我必须迭代超过 130 个数据传输对象,每次都会生成一个要上传到 aws S3 的 json。


由于没有改进,整个过程大约需要90秒。我尝试使用兰巴而不是使用兰巴,两者的结果相同。


for(AbstractDTO dto: dtos) {

    try {

        processDTO(dealerCode, yearPeriod, monthPeriod, dto);

    } catch (FileAlreadyExistsInS3Exception e) {

        failedToUploadDTOs.add(e.getLocalizedMessage() + ": " + dto.fileName() + ".json");

    }

}

dtos.stream().forEach(dto -> {

    try {

        processDTO(dealerCode, yearPeriod, monthPeriod, dto);

    } catch (FileAlreadyExistsInS3Exception e) {

        failedToUploadDTOs.add(e.getLocalizedMessage() + ": " + dto.fileName() + ".json");

    }

});

经过一些调查,我得出结论,processDTO 方法每个项目大约需要0.650ms才能运行。


我的第一次尝试是使用并行流,结果非常好,大约需要15秒才能完成整个过程:


dtos.parallelStream().forEach(dto -> {

    try {

        processDTO(dealerCode, yearPeriod, monthPeriod, dto);

    } catch (FileAlreadyExistsInS3Exception e) {

        failedToUploadDTOs.add(e.getLocalizedMessage() + ": " + dto.fileName() + ".json");

    }

});

但我仍然需要减少这段时间。我研究了如何改进并行流,并发现了分叉池技巧:


ForkJoinPool forkJoinPool = new ForkJoinPool(PARALLELISM_NUMBER);

forkJoinPool.submit(() ->

dtos.parallelStream().forEach(dto -> {

    try {

        processDTO(dealerCode, yearPeriod, monthPeriod, dto);

    } catch (FileAlreadyExistsInS3Exception e) {

        failedToUploadDTOs.add(e.getLocalizedMessage() + ": " + dto.fileName() + ".json");

    }

})).get();

forkJoinPool.shutdown();

不幸的是,结果对我来说有点令人困惑。

  • 当PARALLELISM_NUMBER为8时,完成整个过程大约需要13秒。没有太大的改进。

  • 当PARALLELISM_NUMBER为16时,完成整个过程大约需要8秒钟

  • 当PARALLELISM_NUMBER为32时,完成整个过程大约需要5秒钟

所有测试都是使用 postman 请求完成的,调用控制器方法,最终将迭代 130 个项目

我对5秒感到满意,使用32秒作为PARALLELISM_NUMBER,但我担心后果。

  • 保留32可以吗?

  • 理想的PARALLELISM_NUMBER是什么?

  • 在决定其价值时,我必须记住什么?


查看完整描述

1 回答

?
回首忆惘然

TA贡献1534条经验 获得超10个赞

这些参数决定 将使用多少个线程。这就是为什么默认情况下,可用 CPU 内核计数是:parallelismForkJoinPoolparallelism

Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors())

在你的情况下,装瓶工应该检查文件是否存在并将其上传到S3。这里的时间将取决于至少几个因素:CPU,网卡和驱动程序,操作系统,其他。在您的案例中,S3 网络操作时间似乎没有 CPU 限制,因为您正在通过创建更多模拟工作线程来观察改进,也许网络请求由操作系统排队。

的正确值因工作负荷类型而异。由于上下文切换的负面影响,CPU 密集型工作流最好使用默认值等于 CPU 内核。像您这样的非CPU密集型工作负载可以通过更多的工作线程来加速,假设工作负载不会阻塞CPU,例如通过忙于等待parallelismparallelism

中没有一个单一的理想值。parallelismForkJoinPool

由于您所有有用的建议和解释,我设法减少到8秒。


由于瓶颈是上传到 aws s3,并且您提到了 aws 上的非阻塞 API,因此经过一些研究,我发现类 TransferManager 包含非阻塞上传。


转移管理器类


因此,我没有使用分叉连接池来增加线程数,而是保留了简单的并行流:


        dtos.parallelStream().forEach(dto -> {

            try {

                processDTO(dealerCode, yearPeriod, monthPeriod, dto);

            } catch (FileAlreadyExistsInS3Exception e) {

                failedToUploadDTOs.add(e.getLocalizedMessage() + ": " + dto.fileName() + ".json");

            }

        });

上传到S3方法改变了一点,我没有使用亚马逊S3,而是使用了转移管理器:


public Upload uploadAsyncFileToS3(String fileName, String fileContent) throws FileAlreadyExistsInS3Exception {

        if (s3client.doesObjectExist(bucketName, fileName)) {

            throw new FileAlreadyExistsInS3Exception(ErrorMessages.FILE_ALREADY_EXISTS_IN_S3.getMessage());

        }

        InputStream targetStream = new ByteArrayInputStream(fileContent.getBytes());

        ObjectMetadata metadata = new ObjectMetadata();

        metadata.setContentLength(fileContent.getBytes().length);

        return transferManager.upload(bucketName, fileName, targetStream, metadata);

}

这样,当调用上载时,它不会等待它完成,从而允许处理另一个 DTO。处理完所有 DTO 后,我会检查其上传状态以查看可能的错误(在第一个 Each 之外)


查看完整回答
反对 回复 2022-09-21

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信