Nâng cao
10 phút đọc3 tháng 6, 20261

Ray: Scale Python code từ laptop lên cluster mà không cần đổi gì

Tìm hiểu Ray engine giúp bạn scale ứng dụng AI/ML từ máy local lên hàng trăm node chỉ với vài dòng decorator, kèm hướng dẫn thực hành chi tiết.

N

Nguyễn Nhật Long

@nguyennhatlong1303

Ray: Scale Python code từ laptop lên cluster mà không cần đổi gì

Ray: Scale Python code từ laptop lên cluster mà không cần đổi gì

Mình nhớ hồi mới bắt đầu làm ML pipeline cho một dự án recommendation engine, cái đau đầu nhất không phải là chọn model hay tune hyperparameter mà là lúc model chạy ngon trên laptop rồi, giờ muốn train trên dataset lớn hơn gấp 100 lần thì phải làm sao. Viết lại bằng Spark? Setup Dask cluster? Hay tự tay viết multiprocessing rồi coordinate giữa các node? Mỗi option đều có trade-off riêng, và cái nào cũng đòi hỏi phải refactor code đáng kể.

Rồi mình gặp Ray. Và thật sự, nó thay đổi cách mình nghĩ về distributed computing.

Ray giải quyết bài toán gì mà đáng để quan tâm?

Ray là một AI compute engine nói đơn giản hơn, nó là một distributed runtime cho phép bạn chạy Python code trên nhiều CPU/GPU, nhiều máy, mà gần như không cần thay đổi logic code gốc. Cái hay ở đây là Ray không chỉ dành cho ML. Bất kỳ workload nào viết bằng Python mà bạn muốn parallelize hoặc distribute, Ray đều handle được.

Project này do Anyscale phát triển, hiện có hơn 42.8k stars trên GitHub, và đang được dùng bởi OpenAI, Uber, Spotify, Shopify, Instacart toàn những công ty xử lý data ở scale khổng lồ.

Điểm khác biệt lớn nhất của Ray so với các framework distributed khác là triết lý thiết kế: bạn viết code Python bình thường, thêm vài decorator, và Ray lo phần còn lại. Không cần học một paradigm mới, không cần viết lại application theo kiến trúc MapReduce hay actor model phức tạp.

Cài đặt và chạy thử trong 5 phút

Cài Ray đơn giản không thể tin được:

Terminal
1pip install ray

Nếu bạn muốn dùng các AI libraries đi kèm (Train, Tune, Serve, Data, RLlib):

Terminal
1pip install "ray[all]"

Hoặc cài từng component tùy nhu cầu:

Terminal
1# Chỉ cần Ray Serve cho model serving
2pip install "ray[serve]"
3
4# Chỉ cần Ray Tune cho hyperparameter tuning
5pip install "ray[tune]"
6
7# Chỉ cần Ray Data cho data processing
8pip install "ray[data]"

Bây giờ thử chạy một ví dụ đơn giản. Giả sử bạn có một function tính toán nặng:

Python
1import time
2
3def heavy_computation(x):
4 time.sleep(2) # Giả lập tính toán nặng
5 return x * x
6
7# Chạy tuần tự - mất 10 giây
8results = [heavy_computation(i) for i in range(5)]
9print(results) # [0, 1, 4, 9, 16]

Bây giờ, chuyển sang Ray:

Python
1import ray
2import time
3
4ray.init() # Khởi tạo Ray runtime
5
6@ray.remote
7def heavy_computation(x):
8 time.sleep(2) # Giả lập tính toán nặng
9 return x * x
10
11# Chạy parallel - mất ~2 giây thay vì 10 giây
12futures = [heavy_computation.remote(i) for i in range(5)]
13results = ray.get(futures)
14print(results) # [0, 1, 4, 9, 16]

Chỉ thêm @ray.remote và gọi .remote() thay vì gọi trực tiếp. Đó là tất cả. Ray tự động distribute các function call này ra các CPU core available. Và cái đẹp là đoạn code này chạy trên laptop cũng được, chạy trên cluster 100 node cũng được không cần sửa gì.

Ba khái niệm cốt lõi: Tasks, Actors, Objects

Để dùng Ray hiệu quả, bạn cần nắm ba abstraction chính. Mình sẽ đi sâu vào từng cái.

Tasks Stateless parallel functions

Tasks là cái mình vừa demo ở trên. Bất kỳ function nào decorated bằng @ray.remote đều trở thành một Task. Khi bạn gọi .remote(), Ray schedule function đó chạy trên một worker process nào đó trong cluster, và trả về một future (ObjectRef) ngay lập tức thay vì block chờ kết quả.

Python
1import ray
2import numpy as np
3
4ray.init()
5
6@ray.remote
7def process_batch(batch_data):
8 # Xử lý một batch data
9 return np.mean(batch_data)
10
11# Tạo 100 batch, mỗi batch 1 triệu số
12batches = [np.random.rand(1_000_000) for _ in range(100)]
13
14# Submit tất cả tasks cùng lúc
15futures = [process_batch.remote(batch) for batch in batches]
16
17# Chờ tất cả kết quả
18results = ray.get(futures)
19print(f"Processed {len(results)} batches")

Một điểm hay là bạn có thể specify resource requirements cho mỗi task:

Python
1@ray.remote(num_cpus=2, num_gpus=1)
2def train_model(data):
3 # Task này cần 2 CPU và 1 GPU
4 pass

Actors Stateful distributed objects

Actors là khi bạn cần maintain state. Thay vì decorate function, bạn decorate cả class:

Python
1import ray
2
3ray.init()
4
5@ray.remote
6class Counter:
7 def __init__(self):
8 self.count = 0
9
10 def increment(self):
11 self.count += 1
12 return self.count
13
14 def get_count(self):
15 return self.count
16
17# Tạo actor instance - chạy như một process riêng
18counter = Counter.remote()
19
20# Gọi methods trên actor
21futures = [counter.increment.remote() for _ in range(100)]
22results = ray.get(futures)
23print(f"Final count: {ray.get(counter.get_count.remote())}") # 100

Theo kinh nghiệm của mình, Actors cực kỳ hữu ích cho các use case như:

  • Parameter server trong distributed training
  • Shared state giữa các workers (ví dụ: cache, configuration)
  • Stateful streaming processors

Objects Distributed shared memory

Ray Objects là immutable values được lưu trong distributed object store. Khi một task return kết quả, nó được lưu vào object store và có thể được access bởi bất kỳ task/actor nào khác mà không cần serialize/deserialize lại qua network nếu cùng node.

Python
1import ray
2import numpy as np
3
4ray.init()
5
6# Đưa data lớn vào object store
7large_data = np.random.rand(10_000_000)
8data_ref = ray.put(large_data) # Trả về ObjectRef
9
10@ray.remote
11def process(data_ref):
12 data = ray.get(data_ref) # Lấy data từ object store (zero-copy nếu cùng node)
13 return np.sum(data)
14
15# Nhiều tasks cùng dùng chung data mà không duplicate
16futures = [process.remote(data_ref) for _ in range(10)]
17results = ray.get(futures)

Anh em lưu ý: dùng ray.put() cho data lớn thay vì pass trực tiếp vào .remote(). Nếu pass trực tiếp, Ray sẽ serialize data mỗi lần gọi task, rất tốn memory và bandwidth.

Hệ sinh thái AI Libraries Cái làm Ray thực sự mạnh

Ray Core chỉ là foundation. Sức mạnh thực sự nằm ở bộ AI Libraries được build trên nó:

Mình thấy cái này hay ở chỗ: thay vì phải học và integrate 5-6 tools khác nhau cho ML pipeline, bạn dùng một ecosystem thống nhất. Data processing bằng Ray Data, training bằng Ray Train, tuning bằng Ray Tune, deploy bằng Ray Serve tất cả share cùng một cluster, cùng một cách quản lý resource.

LibraryChức năngTương đương/Thay thế
**Ray Data**Distributed data loading & preprocessingSpark, Dask, tf.data
**Ray Train**Distributed model trainingHorovod, PyTorch DDP
**Ray Tune**Hyperparameter tuningOptuna, HyperOpt
**Ray Serve**Model serving & inferenceTF Serving, Triton, BentoML
**RLlib**Reinforcement learningStable Baselines, OpenAI Gym

Ví dụ thực tế: Distributed Training với Ray Train

Python
1import ray
2from ray import train
3from ray.train.torch import TorchTrainer
4from ray.train import ScalingConfig
5import torch
6import torch.nn as nn
7
8def train_func(config):
9 # Code training PyTorch bình thường
10 model = nn.Linear(10, 1)
11 optimizer = torch.optim.SGD(model.parameters(), lr=config["lr"])
12
13 # Ray Train tự động wrap model cho distributed training
14 model = train.torch.prepare_model(model)
15
16 for epoch in range(config["epochs"]):
17 # Training loop bình thường
18 inputs = torch.randn(64, 10)
19 targets = torch.randn(64, 1)
20
21 outputs = model(inputs)
22 loss = nn.MSELoss()(outputs, targets)
23
24 optimizer.zero_grad()
25 loss.backward()
26 optimizer.step()
27
28 # Report metrics
29 train.report({"loss": loss.item(), "epoch": epoch})
30
31# Config: chạy trên 4 GPU workers
32trainer = TorchTrainer(
33 train_func,
34 train_loop_config={"lr": 0.01, "epochs": 10},
35 scaling_config=ScalingConfig(num_workers=4, use_gpu=True),
36)
37
38result = trainer.fit()
39print(f"Best loss: {result.metrics['loss']}")

Điểm mình thích nhất là bạn viết training loop PyTorch hoàn toàn bình thường. Ray Train chỉ cần bạn wrap model bằng prepare_model() và report metrics nó tự handle data parallelism, gradient synchronization, checkpoint management.

Hyperparameter Tuning với Ray Tune

Python
1from ray import tune
2from ray.tune.schedulers import ASHAScheduler
3
4def trainable(config):
5 for step in range(100):
6 # Giả lập training
7 score = config["lr"] * 0.1 + config["batch_size"] * 0.01
8 score += step * 0.001
9 tune.report({"score": score, "step": step})
10
11# Define search space
12search_space = {
13 "lr": tune.loguniform(1e-4, 1e-1),
14 "batch_size": tune.choice([16, 32, 64, 128]),
15 "hidden_size": tune.choice([64, 128, 256]),
16}
17
18# ASHA scheduler tự động dừng sớm các trials kém
19scheduler = ASHAScheduler(
20 max_t=100,
21 grace_period=10,
22 reduction_factor=2,
23)
24
25tuner = tune.Tuner(
26 trainable,
27 param_space=search_space,
28 tune_config=tune.TuneConfig(
29 metric="score",
30 mode="max",
31 num_samples=50, # Thử 50 combinations
32 scheduler=scheduler,
33 ),
34)
35
36results = tuner.fit()
37best_result = results.get_best_result()
38print(f"Best config: {best_result.config}")

Ray Tune chạy 50 trials song song trên tất cả resources available, và ASHA scheduler sẽ tự kill các trials có vẻ không triển vọng để giải phóng resource cho trials khác. Theo kinh nghiệm của mình, việc này giảm thời gian tuning xuống 3-5 lần so với grid search truyền thống.

Deploy lên production với Ray Serve

Đây là phần mình nghĩ nhiều anh em sẽ quan tâm. Ray Serve cho phép bạn deploy ML model (hoặc bất kỳ Python function nào) thành HTTP endpoint:

Python
1from ray import serve
2import ray
3from starlette.requests import Request
4
5ray.init()
6
7@serve.deployment(num_replicas=3, ray_actor_options={"num_cpus": 1})
8class ModelDeployment:
9 def __init__(self):
10 # Load model một lần khi khởi tạo
11 self.model = self._load_model()
12
13 def _load_model(self):
14 # Load model từ file, registry, etc.
15 return lambda x: x * 2 # Placeholder
16
17 async def __call__(self, request: Request):
18 data = await request.json()
19 result = self.model(data["input"])
20 return {"prediction": result}
21
22# Deploy
23serve.run(ModelDeployment.bind())
24
25# Test
26import requests
27resp = requests.post("http://localhost:8000", json={"input": 42})
28print(resp.json()) # {"prediction": 84}

Cái đẹp của Ray Serve so với các serving framework khác:

Đặc biệt, Ray Serve hỗ trợ model composition rất tự nhiên bạn có thể chain nhiều models lại, thêm business logic ở giữa, tất cả trong Python thuần:

FeatureRay ServeTF ServingTritonBentoML
Multi-framework (PyTorch, TF, sklearn)❌ (TF only)
Composition (chaining models)✅ NativeLimited
AutoscalingManualManual
Python-native❌ (gRPC config)❌ (config files)
Dynamic batching
Arbitrary Python logic giữa modelsLimited
Python
1@serve.deployment
2class Preprocessor:
3 def process(self, raw_input):
4 return raw_input.lower().strip()
5
6@serve.deployment
7class Classifier:
8 def predict(self, text):
9 return {"label": "positive", "score": 0.95}
10
11@serve.deployment
12class Pipeline:
13 def __init__(self, preprocessor, classifier):
14 self.preprocessor = preprocessor
15 self.classifier = classifier
16
17 async def __call__(self, request: Request):
18 data = await request.json()
19 # Chain các models
20 cleaned = await self.preprocessor.process.remote(data["text"])
21 result = await self.classifier.predict.remote(cleaned)
22 return result
23
24# Bind dependencies
25preprocessor = Preprocessor.bind()
26classifier = Classifier.bind()
27pipeline = Pipeline.bind(preprocessor, classifier)
28serve.run(pipeline)

Monitoring với Ray Dashboard

Khi bạn chạy ray.init(), Ray tự động start một dashboard ở http://localhost:8265. Dashboard này cho bạn thấy:

  • Cluster overview: bao nhiêu nodes, CPU/GPU utilization
  • Jobs: các Ray jobs đang chạy, đã chạy
  • Actors: trạng thái của tất cả actors
  • Logs: centralized logs từ tất cả workers
  • Metrics: Prometheus metrics tích hợp sẵn

Mình khuyên anh em luôn mở dashboard khi develop với Ray. Nó giúp debug cực nhanh nhất là khi bạn gặp các vấn đề như task bị stuck, memory leak, hay resource contention.

Một vài tips từ thực tế

1. Đừng Ray hóa mọi thứ. Nếu function của bạn chạy trong vài millisecond, overhead của việc schedule task qua Ray có thể lớn hơn benefit. Ray phù hợp nhất cho tasks chạy từ vài trăm ms trở lên.

2. Cẩn thận với memory. Mỗi khi bạn gọi .remote() với arguments, Ray serialize arguments đó. Nếu bạn pass một DataFrame 1GB vào 100 tasks, bạn vừa tạo ra 100GB serialized data. Dùng ray.put() để share data.

Python
1# ❌ Sai - serialize data 100 lần
2futures = [process.remote(large_df) for _ in range(100)]
3
4# ✅ Đúng - serialize 1 lần, share reference
5data_ref = ray.put(large_df)
6futures = [process.remote(data_ref) for _ in range(100)]

3. Set num_cpus hợp lý. Mặc định mỗi task dùng 1 CPU. Nếu bạn có task I/O-bound (gọi API, đọc file), set num_cpus=0.25 để Ray schedule nhiều tasks hơn trên cùng một core:

Python
1@ray.remote(num_cpus=0.25) # 4 tasks per CPU core
2def call_api(url):
3 return requests.get(url).json()

4. Dùng ray.wait() thay vì ray.get() khi cần process results incrementally:

Python
1# Thay vì chờ tất cả xong
2results = ray.get(futures) # Block cho đến khi TẤT CẢ xong
3
4# Process từng kết quả khi nó ready
5remaining = futures
6while remaining:
7 done, remaining = ray.wait(remaining, num_returns=1)
8 result = ray.get(done[0])
9 print(f"Got result: {result}")

Khi nào nên và không nên dùng Ray?

Mình từng thấy team dùng Ray cho mọi thứ kể cả những task đơn giản, và kết quả là complexity tăng mà benefit không đáng kể. Ray mạnh nhất khi bạn thực sự cần scale khi single machine không đủ, hoặc khi bạn cần orchestrate nhiều components phức tạp trong ML pipeline.

Nên dùngKhông nên dùng
ML training trên dataset lớnCRUD web app đơn giản
Hyperparameter tuningTasks chạy < 10ms
Data preprocessing pipelineKhi team chưa quen Python
Model serving cần autoscaleKhi đã có Spark infrastructure ổn định
Batch inference trên nhiều GPUSingle-model inference đơn giản (dùng FastAPI đủ rồi)
Reinforcement learningKhi workload chỉ cần 1 máy là đủ

Nếu bạn đang làm AI/ML và bắt đầu cảm thấy đau đầu với việc scale, mình nghĩ Ray là một trong những lựa chọn tốt nhất hiện tại. Community lớn, documentation tốt, và cái learning curve ban đầu thực sự rất thấp bạn có thể productive trong vòng một buổi chiều. Cứ pip install ray rồi thử, anh em sẽ thấy.

NN

Nguyễn Nhật Long

@nguyennhatlong1303

Nguyễn Nhật Long is a Senior Frontend Engineer and Frontend Team Leader with 7 years of experience building real-time fintech platforms. Specializing in React, Next.js, TypeScript, and React Native, shipping 10+ products across Web, Mobile, Telegram Mini-Apps, and Web3.

Thấy hay? Chia sẻ cho bạn bè!