Nvidia Developer 02月16日
Accelerating GPU Analytics Using RAPIDS and Ray
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

本文探讨了如何利用Ray和RAPIDS加速新型分析管道。RAPIDS是一套开源的GPU加速数据科学和AI库,与Spark和Dask等分布式引擎良好兼容。Ray是一个流行的开源分布式Python框架,常用于扩展AI和机器学习(ML)应用程序。Ray尤其擅长简化和扩展训练和推理管道,并且可以轻松地针对CPU和GPU设备。文章重点介绍了Ray Actors(有状态的worker)如何与RAPIDS集成,利用cuDF加速IO,并通过NCCL和cuGraph实现分布式加速GPU计算,最后以cuGraph的弱连接组件(WCC)实现为例,展示了Ray和RAPIDS协同工作的强大能力。

🚀Ray Actors是Ray的核心,是状态化的workers,可以存储、管理和修改数据。例如,可以使用cuDF在GPU上加载数据,通过Ray创建多个actors,利用cuDF加速IO,并结合RAPIDS优化(RMM内存配置)或常见的ETL例程(如过滤/自定义和用户定义函数)。

🤝许多RAPIDS算法的实现都已构建为C++中分布式加速GPU计算,依赖于NCCL加速通信以及RAFT中的原语和求解器。RAFT原语被用于包括cuML和cuGraph在内的多个RAPIDS库中。

🔗cuGraph的弱连接组件(WCC)实现主要基于将已清理的数据从磁盘快速传递到较低级别的CUDA C++实现。开发者可以将RAPIDS(cuGraph)和Ray结合使用,以访问强大的分布式加速算法。

RAPIDS is a suite of open-source GPU-accelerated data science and AI libraries that are well supported for scale-out with distributed engines like Spark and Dask. Ray is a popular open-source distributed Python framework commonly used to scale AI and machine learning (ML) applications. Ray particularly excels at simplifying and scaling training and inference pipelines and can easily target both CPU and GPU devices. In this post, we explore how Ray and RAPIDS can be used to accelerate novel analytics pipelines.Ray ActorsWhile Ray offers high level abstractions for training and serving ML models, we will be experimenting with the core of Ray, specifically with Ray Actors. Actors are stateful workers, meaning that each worker can store, manage, and mutate any data stored. For example, if you want to use cuDF to load some data on GPU, you could do the following:@ray.remote(num_gpus=1)class cuDFActor: def init(self): ... def read_parquet(self, filepath: str, columns: list = None) -> cudf.DataFrame: return cudf.read_parquet(filepath, columns=columns)# Start 4 Workers pool_size = 4actor_pool = [cuDFActor.remote() for i in range(pool_size)]This example uses Ray to create four actors on four GPUs and cuDF to accelerate IO. This example can be used with other RAPIDS optimizations (memory configuration with RMM) or common ETL routines like filtering/custom and user-defined functions: cudf ray-actor example.Ray Actors are wonderfully general and can be quickly leveraged for parallelizing Python libraries and easily integrate with existing distributed algorithms as well. Additionally, with Ray, you can easily scale this work across multiple GPUs and multiple nodes.  NCCL and cuGraphMany RAPIDS implementations of popular algorithms are already built for distributed accelerated GPU computing in C++. These implementations are highly tuned and rely on accelerated communications with NCCL and primitives and solvers found in RAFT (pairwise distances, k-means clustering, iterative solvers, and more). RAFT primitives are used in several RAPIDS libraries including cuML and cuGraph.cuGraph weakly connected components (WCC) implementation, for example, is largely based on pipelining already cleaned data as quickly as possible from disk to the lower-level CUDA C++ implementation. WCC is a good goal for demonstrating how developers can use both RAPIDS (cuGraph) and Ray together to gain access to powerful, distributed, accelerated algorithms. To implement the WCC requires the following:Loading data into GPU memoryStarting NCCL comms (and cuGraph sub-communicator)Instantiating and configuring internal multi-GPU cuGraph implementationExecuting WCCThe first step has been demonstrated. And while Ray has NCCL hooks, we will rely on RAFT NCCL interfaces due to the hard dependency from cuGraph to manage communications. The following stubs out the requirements just outlined:class RAFTActor: def init(self, index, pool_size, session_id): ... def broadcast_root_unique_id(self): # broadcast root/rank-0 to all actors def _setup_nccl(self): # start NCCL with identified rank-0 actor def _setup_raft(self): # configure RAFT and NCCL together def set_root_unique_id(self, root_uniqueId): # To be set rank-0 for all actors@ray.remote(num_gpus=1)class WCCActor(RAFTActor): def init(self, index, pool_size, session_id): super().init(index=index, pool_size=pool_size, session_id=session_id, actor_name_prefix="WCC") def weakly_connected_components(self, df): """ 1. Each actor loads in a chunk 2. Each actor has a NCCL/RAFT Handle 3. Pass each chunk and handle to MGGraph """ src_array = df['src'] dst_array = df['dst'] weights = df['wgt'] # Configure and setup a Multi-GPU cuGraph Object with # edge list data and NCCL graph = MGGraph(src_array, dst_array, weights, ...) # Execute WCC weakly_connected_components(graph)# Initialize Ray and Run WCC algorithmThis covers the two classes required to run cuGraph weakly connected components. To learn more, see an implementation of weakly connected components. Much of the work is configuring NCCL/RAFT. This same pattern works for other libraries like cuML, as demonstrated with the cuML k-means implementation.ConclusionRay provides an expressable and scalable Actor interface that can be easily leveraged with RAPIDS. We’ve explored how to connect Ray Actors to use optimized CUDA C++ and NCCL implementations. This exploration has largely focused on Level 1 integration of Ray Actors as launchers. To learn more about GPU-accelerated data processing, join the 3,500+ members in the RAPIDS Slack community.

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

Ray RAPIDS GPU加速 分布式计算
相关文章