Executing the following will not work concurrently, instead it will first execute Run1 and block until it's completed, before it will execute Run2.
@ray.remote
class Test:
def __init__(self):
pass
def Run1(self):
print('Run1 Start')
sleep(5)
print('Run1 End')
def Run2(self):
print('Run2')
ray.init()
test = Test.remote()
test.Run1.remote()
test.Run2.remote()
sleep(10)
Output:
(pid=8109) Run1 Start
(pid=8109) Run1 End
(pid=8109) Run2
This is a bit unexpected. How can I enforce that the methods get executed concurrently?
EDIT TO ADDRESS THE FOLLOW UP COMMENTS:
Doing a dual threaded approach doesn't seem to work. The below code consistently results in broken pipes from PyArrow. I'd like to run both, the self.PreloadSamples method as well as the self.Optimize methods consistently in parallel. The BufferActor class collects and provides batched samples through the @ray.remote decorated GetSamples() method. Since data on GPU is not serializable, this needs to be done on the Optimizer object side, and I want to make sure that this gets done in parallel and not sequentially with respect to the optimization.
See below for a fully isolated version of the problem that replicates the issues after about 1 minute of running:
import torch
import ray
import threading
from time import sleep
def Threaded(fn):
def wrapper(*args, **kwargs):
thread = threading.Thread(target=fn, args=args, kwargs=kwargs)
thread.start()
return thread
return wrapper
@ray.remote
class BufferActor():
def __init__(self):
pass
def GetSamples(self):
return torch.randn(32, 100)
@ray.remote(num_gpus=1)
class OptimizerActor():
def __init__(self, bufferActor):
self.bufferActor = bufferActor
self.samplesOnGPU = list()
self.PreloadSamples()
self.Optimize()
@Threaded
def PreloadSamples(self):
#this retrieves a batch of samples (in numpy/torch format on CPU)
if (len(self.samplesOnGPU) < 5):
samples = ray.get(self.bufferActor.GetSamples.remote())
self.samplesOnGPU.append(samples.to('cuda'))
print('Samples Buffer: %s' % len(self.samplesOnGPU))
else:
sleep(0.01)
self.PreloadSamples()
@Threaded
def Optimize(self):
if (len(self.samplesOnGPU) > 0):
samples = self.samplesOnGPU.pop(0)
print('Optimizing')
#next we perform loss calc + backprop + optimizer step (not shown)
sleep(0.01)
self.Optimize()
ray.init()
bufferActor = BufferActor.remote()
optimizerActor = OptimizerActor.remote(bufferActor)
sleep(60*60)
Actors will execute one method at a time to avoid concurrency issues. If you want parallelism with actors (which you normally do), the best way is to start two (or more) actors and submit tasks to them both.