Ray: Scale Python code từ laptop lên cluster mà không cần đổi gì
Tìm hiểu Ray engine giúp bạn scale ứng dụng AI/ML từ máy local lên hàng trăm node chỉ với vài dòng decorator, kèm hướng dẫn thực hành chi tiết.
Nguyễn Nhật Long
@nguyennhatlong1303
Ray: Scale Python code từ laptop lên cluster mà không cần đổi gì
Mình nhớ hồi mới bắt đầu làm ML pipeline cho một dự án recommendation engine, cái đau đầu nhất không phải là chọn model hay tune hyperparameter mà là lúc model chạy ngon trên laptop rồi, giờ muốn train trên dataset lớn hơn gấp 100 lần thì phải làm sao. Viết lại bằng Spark? Setup Dask cluster? Hay tự tay viết multiprocessing rồi coordinate giữa các node? Mỗi option đều có trade-off riêng, và cái nào cũng đòi hỏi phải refactor code đáng kể.
Rồi mình gặp Ray. Và thật sự, nó thay đổi cách mình nghĩ về distributed computing.
Ray giải quyết bài toán gì mà đáng để quan tâm?
Ray là một AI compute engine nói đơn giản hơn, nó là một distributed runtime cho phép bạn chạy Python code trên nhiều CPU/GPU, nhiều máy, mà gần như không cần thay đổi logic code gốc. Cái hay ở đây là Ray không chỉ dành cho ML. Bất kỳ workload nào viết bằng Python mà bạn muốn parallelize hoặc distribute, Ray đều handle được.
Project này do Anyscale phát triển, hiện có hơn 42.8k stars trên GitHub, và đang được dùng bởi OpenAI, Uber, Spotify, Shopify, Instacart toàn những công ty xử lý data ở scale khổng lồ.
Điểm khác biệt lớn nhất của Ray so với các framework distributed khác là triết lý thiết kế: bạn viết code Python bình thường, thêm vài decorator, và Ray lo phần còn lại. Không cần học một paradigm mới, không cần viết lại application theo kiến trúc MapReduce hay actor model phức tạp.
Cài đặt và chạy thử trong 5 phút
Cài Ray đơn giản không thể tin được:
1pip install ray
Nếu bạn muốn dùng các AI libraries đi kèm (Train, Tune, Serve, Data, RLlib):
1pip install "ray[all]"
Hoặc cài từng component tùy nhu cầu:
1# Chỉ cần Ray Serve cho model serving2pip install "ray[serve]"34# Chỉ cần Ray Tune cho hyperparameter tuning5pip install "ray[tune]"67# Chỉ cần Ray Data cho data processing8pip install "ray[data]"
Bây giờ thử chạy một ví dụ đơn giản. Giả sử bạn có một function tính toán nặng:
1import time23def heavy_computation(x):4 time.sleep(2) # Giả lập tính toán nặng5 return x * x67# Chạy tuần tự - mất 10 giây8results = [heavy_computation(i) for i in range(5)]9print(results) # [0, 1, 4, 9, 16]
Bây giờ, chuyển sang Ray:
1import ray2import time34ray.init() # Khởi tạo Ray runtime56@ray.remote7def heavy_computation(x):8 time.sleep(2) # Giả lập tính toán nặng9 return x * x1011# Chạy parallel - mất ~2 giây thay vì 10 giây12futures = [heavy_computation.remote(i) for i in range(5)]13results = ray.get(futures)14print(results) # [0, 1, 4, 9, 16]
Chỉ thêm @ray.remote và gọi .remote() thay vì gọi trực tiếp. Đó là tất cả. Ray tự động distribute các function call này ra các CPU core available. Và cái đẹp là đoạn code này chạy trên laptop cũng được, chạy trên cluster 100 node cũng được không cần sửa gì.
Ba khái niệm cốt lõi: Tasks, Actors, Objects
Để dùng Ray hiệu quả, bạn cần nắm ba abstraction chính. Mình sẽ đi sâu vào từng cái.
Tasks Stateless parallel functions
Tasks là cái mình vừa demo ở trên. Bất kỳ function nào decorated bằng @ray.remote đều trở thành một Task. Khi bạn gọi .remote(), Ray schedule function đó chạy trên một worker process nào đó trong cluster, và trả về một future (ObjectRef) ngay lập tức thay vì block chờ kết quả.
1import ray2import numpy as np34ray.init()56@ray.remote7def process_batch(batch_data):8 # Xử lý một batch data9 return np.mean(batch_data)1011# Tạo 100 batch, mỗi batch 1 triệu số12batches = [np.random.rand(1_000_000) for _ in range(100)]1314# Submit tất cả tasks cùng lúc15futures = [process_batch.remote(batch) for batch in batches]1617# Chờ tất cả kết quả18results = ray.get(futures)19print(f"Processed {len(results)} batches")
Một điểm hay là bạn có thể specify resource requirements cho mỗi task:
1@ray.remote(num_cpus=2, num_gpus=1)2def train_model(data):3 # Task này cần 2 CPU và 1 GPU4 pass
Actors Stateful distributed objects
Actors là khi bạn cần maintain state. Thay vì decorate function, bạn decorate cả class:
1import ray23ray.init()45@ray.remote6class Counter:7 def __init__(self):8 self.count = 0910 def increment(self):11 self.count += 112 return self.count1314 def get_count(self):15 return self.count1617# Tạo actor instance - chạy như một process riêng18counter = Counter.remote()1920# Gọi methods trên actor21futures = [counter.increment.remote() for _ in range(100)]22results = ray.get(futures)23print(f"Final count: {ray.get(counter.get_count.remote())}") # 100
Theo kinh nghiệm của mình, Actors cực kỳ hữu ích cho các use case như:
- Parameter server trong distributed training
- Shared state giữa các workers (ví dụ: cache, configuration)
- Stateful streaming processors
Objects Distributed shared memory
Ray Objects là immutable values được lưu trong distributed object store. Khi một task return kết quả, nó được lưu vào object store và có thể được access bởi bất kỳ task/actor nào khác mà không cần serialize/deserialize lại qua network nếu cùng node.
1import ray2import numpy as np34ray.init()56# Đưa data lớn vào object store7large_data = np.random.rand(10_000_000)8data_ref = ray.put(large_data) # Trả về ObjectRef910@ray.remote11def process(data_ref):12 data = ray.get(data_ref) # Lấy data từ object store (zero-copy nếu cùng node)13 return np.sum(data)1415# Nhiều tasks cùng dùng chung data mà không duplicate16futures = [process.remote(data_ref) for _ in range(10)]17results = ray.get(futures)
Anh em lưu ý: dùng ray.put() cho data lớn thay vì pass trực tiếp vào .remote(). Nếu pass trực tiếp, Ray sẽ serialize data mỗi lần gọi task, rất tốn memory và bandwidth.
Hệ sinh thái AI Libraries Cái làm Ray thực sự mạnh
Ray Core chỉ là foundation. Sức mạnh thực sự nằm ở bộ AI Libraries được build trên nó:
Mình thấy cái này hay ở chỗ: thay vì phải học và integrate 5-6 tools khác nhau cho ML pipeline, bạn dùng một ecosystem thống nhất. Data processing bằng Ray Data, training bằng Ray Train, tuning bằng Ray Tune, deploy bằng Ray Serve tất cả share cùng một cluster, cùng một cách quản lý resource.
| Library | Chức năng | Tương đương/Thay thế |
|---|---|---|
| **Ray Data** | Distributed data loading & preprocessing | Spark, Dask, tf.data |
| **Ray Train** | Distributed model training | Horovod, PyTorch DDP |
| **Ray Tune** | Hyperparameter tuning | Optuna, HyperOpt |
| **Ray Serve** | Model serving & inference | TF Serving, Triton, BentoML |
| **RLlib** | Reinforcement learning | Stable Baselines, OpenAI Gym |
Ví dụ thực tế: Distributed Training với Ray Train
1import ray2from ray import train3from ray.train.torch import TorchTrainer4from ray.train import ScalingConfig5import torch6import torch.nn as nn78def train_func(config):9 # Code training PyTorch bình thường10 model = nn.Linear(10, 1)11 optimizer = torch.optim.SGD(model.parameters(), lr=config["lr"])1213 # Ray Train tự động wrap model cho distributed training14 model = train.torch.prepare_model(model)1516 for epoch in range(config["epochs"]):17 # Training loop bình thường18 inputs = torch.randn(64, 10)19 targets = torch.randn(64, 1)2021 outputs = model(inputs)22 loss = nn.MSELoss()(outputs, targets)2324 optimizer.zero_grad()25 loss.backward()26 optimizer.step()2728 # Report metrics29 train.report({"loss": loss.item(), "epoch": epoch})3031# Config: chạy trên 4 GPU workers32trainer = TorchTrainer(33 train_func,34 train_loop_config={"lr": 0.01, "epochs": 10},35 scaling_config=ScalingConfig(num_workers=4, use_gpu=True),36)3738result = trainer.fit()39print(f"Best loss: {result.metrics['loss']}")
Điểm mình thích nhất là bạn viết training loop PyTorch hoàn toàn bình thường. Ray Train chỉ cần bạn wrap model bằng prepare_model() và report metrics nó tự handle data parallelism, gradient synchronization, checkpoint management.
Hyperparameter Tuning với Ray Tune
1from ray import tune2from ray.tune.schedulers import ASHAScheduler34def trainable(config):5 for step in range(100):6 # Giả lập training7 score = config["lr"] * 0.1 + config["batch_size"] * 0.018 score += step * 0.0019 tune.report({"score": score, "step": step})1011# Define search space12search_space = {13 "lr": tune.loguniform(1e-4, 1e-1),14 "batch_size": tune.choice([16, 32, 64, 128]),15 "hidden_size": tune.choice([64, 128, 256]),16}1718# ASHA scheduler tự động dừng sớm các trials kém19scheduler = ASHAScheduler(20 max_t=100,21 grace_period=10,22 reduction_factor=2,23)2425tuner = tune.Tuner(26 trainable,27 param_space=search_space,28 tune_config=tune.TuneConfig(29 metric="score",30 mode="max",31 num_samples=50, # Thử 50 combinations32 scheduler=scheduler,33 ),34)3536results = tuner.fit()37best_result = results.get_best_result()38print(f"Best config: {best_result.config}")
Ray Tune chạy 50 trials song song trên tất cả resources available, và ASHA scheduler sẽ tự kill các trials có vẻ không triển vọng để giải phóng resource cho trials khác. Theo kinh nghiệm của mình, việc này giảm thời gian tuning xuống 3-5 lần so với grid search truyền thống.
Deploy lên production với Ray Serve
Đây là phần mình nghĩ nhiều anh em sẽ quan tâm. Ray Serve cho phép bạn deploy ML model (hoặc bất kỳ Python function nào) thành HTTP endpoint:
1from ray import serve2import ray3from starlette.requests import Request45ray.init()67@serve.deployment(num_replicas=3, ray_actor_options={"num_cpus": 1})8class ModelDeployment:9 def __init__(self):10 # Load model một lần khi khởi tạo11 self.model = self._load_model()1213 def _load_model(self):14 # Load model từ file, registry, etc.15 return lambda x: x * 2 # Placeholder1617 async def __call__(self, request: Request):18 data = await request.json()19 result = self.model(data["input"])20 return {"prediction": result}2122# Deploy23serve.run(ModelDeployment.bind())2425# Test26import requests27resp = requests.post("http://localhost:8000", json={"input": 42})28print(resp.json()) # {"prediction": 84}
Cái đẹp của Ray Serve so với các serving framework khác:
Đặc biệt, Ray Serve hỗ trợ model composition rất tự nhiên bạn có thể chain nhiều models lại, thêm business logic ở giữa, tất cả trong Python thuần:
| Feature | Ray Serve | TF Serving | Triton | BentoML |
|---|---|---|---|---|
| Multi-framework (PyTorch, TF, sklearn) | ✅ | ❌ (TF only) | ✅ | ✅ |
| Composition (chaining models) | ✅ Native | ❌ | Limited | ✅ |
| Autoscaling | ✅ | Manual | Manual | ✅ |
| Python-native | ✅ | ❌ (gRPC config) | ❌ (config files) | ✅ |
| Dynamic batching | ✅ | ✅ | ✅ | ✅ |
| Arbitrary Python logic giữa models | ✅ | ❌ | ❌ | Limited |
1@serve.deployment2class Preprocessor:3 def process(self, raw_input):4 return raw_input.lower().strip()56@serve.deployment7class Classifier:8 def predict(self, text):9 return {"label": "positive", "score": 0.95}1011@serve.deployment12class Pipeline:13 def __init__(self, preprocessor, classifier):14 self.preprocessor = preprocessor15 self.classifier = classifier1617 async def __call__(self, request: Request):18 data = await request.json()19 # Chain các models20 cleaned = await self.preprocessor.process.remote(data["text"])21 result = await self.classifier.predict.remote(cleaned)22 return result2324# Bind dependencies25preprocessor = Preprocessor.bind()26classifier = Classifier.bind()27pipeline = Pipeline.bind(preprocessor, classifier)28serve.run(pipeline)
Monitoring với Ray Dashboard
Khi bạn chạy ray.init(), Ray tự động start một dashboard ở http://localhost:8265. Dashboard này cho bạn thấy:
- Cluster overview: bao nhiêu nodes, CPU/GPU utilization
- Jobs: các Ray jobs đang chạy, đã chạy
- Actors: trạng thái của tất cả actors
- Logs: centralized logs từ tất cả workers
- Metrics: Prometheus metrics tích hợp sẵn
Mình khuyên anh em luôn mở dashboard khi develop với Ray. Nó giúp debug cực nhanh nhất là khi bạn gặp các vấn đề như task bị stuck, memory leak, hay resource contention.
Một vài tips từ thực tế
1. Đừng Ray hóa mọi thứ. Nếu function của bạn chạy trong vài millisecond, overhead của việc schedule task qua Ray có thể lớn hơn benefit. Ray phù hợp nhất cho tasks chạy từ vài trăm ms trở lên.
2. Cẩn thận với memory. Mỗi khi bạn gọi .remote() với arguments, Ray serialize arguments đó. Nếu bạn pass một DataFrame 1GB vào 100 tasks, bạn vừa tạo ra 100GB serialized data. Dùng ray.put() để share data.
1# ❌ Sai - serialize data 100 lần2futures = [process.remote(large_df) for _ in range(100)]34# ✅ Đúng - serialize 1 lần, share reference5data_ref = ray.put(large_df)6futures = [process.remote(data_ref) for _ in range(100)]
3. Set num_cpus hợp lý. Mặc định mỗi task dùng 1 CPU. Nếu bạn có task I/O-bound (gọi API, đọc file), set num_cpus=0.25 để Ray schedule nhiều tasks hơn trên cùng một core:
1@ray.remote(num_cpus=0.25) # 4 tasks per CPU core2def call_api(url):3 return requests.get(url).json()
4. Dùng ray.wait() thay vì ray.get() khi cần process results incrementally:
1# Thay vì chờ tất cả xong2results = ray.get(futures) # Block cho đến khi TẤT CẢ xong34# Process từng kết quả khi nó ready5remaining = futures6while remaining:7 done, remaining = ray.wait(remaining, num_returns=1)8 result = ray.get(done[0])9 print(f"Got result: {result}")
Khi nào nên và không nên dùng Ray?
Mình từng thấy team dùng Ray cho mọi thứ kể cả những task đơn giản, và kết quả là complexity tăng mà benefit không đáng kể. Ray mạnh nhất khi bạn thực sự cần scale khi single machine không đủ, hoặc khi bạn cần orchestrate nhiều components phức tạp trong ML pipeline.
| Nên dùng | Không nên dùng |
|---|---|
| ML training trên dataset lớn | CRUD web app đơn giản |
| Hyperparameter tuning | Tasks chạy < 10ms |
| Data preprocessing pipeline | Khi team chưa quen Python |
| Model serving cần autoscale | Khi đã có Spark infrastructure ổn định |
| Batch inference trên nhiều GPU | Single-model inference đơn giản (dùng FastAPI đủ rồi) |
| Reinforcement learning | Khi workload chỉ cần 1 máy là đủ |
Nếu bạn đang làm AI/ML và bắt đầu cảm thấy đau đầu với việc scale, mình nghĩ Ray là một trong những lựa chọn tốt nhất hiện tại. Community lớn, documentation tốt, và cái learning curve ban đầu thực sự rất thấp bạn có thể productive trong vòng một buổi chiều. Cứ pip install ray rồi thử, anh em sẽ thấy.
Nguyễn Nhật Long
@nguyennhatlong1303Nguyễn Nhật Long is a Senior Frontend Engineer and Frontend Team Leader with 7 years of experience building real-time fintech platforms. Specializing in React, Next.js, TypeScript, and React Native, shipping 10+ products across Web, Mobile, Telegram Mini-Apps, and Web3.
Thấy hay? Chia sẻ cho bạn bè!