I'm trying to write a Dataflow pipeline in Python that requires a large numpy matrix as a side input. The matrix is saved in cloud storage. Ideally, each Dataflow worker would load the matrix directly from cloud storage.
My understanding is that if I say matrix = np.load(LOCAL_PATH_TO_MATRIX)
, and then
p | "computation" >> beam.Map(computation, matrix)
the matrix get shipped from my laptop to each Datflow worker.
How could I instead direct each worker to load the matrix directly from cloud storage? Is there a beam source for "binary blob"?
Your approach is correct.
What Dataflow does, in this case, is handle the NumPy matrix as a side input. This means that it's uploaded once from your machine to the service, and the Dataflow service will send it to each worker.
Given that the matrix is large, this will make your workers use I/O to receive it from the service, and carry the burden of keeping the whole matrix in memory, but it should work.
If you want to avoid computing/loading the matrix in your machine, you can upload your matrix to GCS as a text file, read that file in, and obtain the matrix. You can do something like so:
matrix_file = 'gs://mybucket/my/matrix'
p | beam.ParDo(ComputationDoFn(matrix_file))
And your DoFn could be something like:
class ComputationDoFn(beam.DoFn):
def __init__(self, matrix_file):
self._matrix_file = matrix_file
self._matrix = None
def start_bundle(self, element):
# We check because one DoFn instance may be reused
# for different bundles.
if self._matrix is None:
self.load_matrix(self._matrix_file)
def process(self, element):
# Now process the element
def load_matrix(self, matrix_file):
# Load the file from GCS using the GCS API
I hope this makes sense. I can flesh up the functions if you feel like you need some more help.