Spark - missing 1 required position argument (lamb

2019-08-22 06:48发布

问题:

I'm trying to distribute some text extraction from PDFs between multiple servers using Spark. This is using a custom Python module I made and is an implementation of this question. The 'extractTextFromPdf' function takes 2 arguments: a string representing the path to the file, and a configuration file used to determine various extraction constraints. In this case the config file is just a simple YAML file sitting in the same folder as the Python script running the extraction and the files are just duplicated between Spark servers.

The main issue I have is being able to call my extract function using the filename as the first argument, rather than the file's content. This is the basic script I have as of now, running it on 2 PDFs in the files folder:

#!/usr/bin/env python3

import ScannedTextExtractor.STE as STE

from pyspark import SparkContext
sc = SparkContext("local", "STE")

input = sc.binaryFiles("/home/ubuntu/files")
processed = input.map(lambda filename, content: (STE.extractTextFromPdf(filename,'ste-config.yaml'), content))

print("Results:")
print(processed.take(2))

This creates the lambda error Missing 1 position argument: 'content'. I don't really care about using the PDFs raw content and since the argument to my extraction function is just the path to the PDF, not the actual PDF content itself, I tried to just give 1 argument to the lambda function. e.g.

processed = input.map(lambda filename: STE.extractTextFromPdf(filename,'ste-config.yaml'))

But then I get issues because with this setup Spark sets the PDF content (as a byte stream) as this singular argument, but my module expects a string with the path to the PDF as the first arg, not the whole byte content of the PDF.

I printed the RDD of the binary file loading by the SparkContext and I can see that in there is both the filename and the file content (a byte stream of the PDF) in the RDD. But how do I use it with my custom Python module that expects the following snytax:

STE.extractTextFromPDF('/path/to/pdf','/path/to/config-file')

I've tried multiple permutations of the lambda function, I've triple checked Spark's RDD and SparkContext APIs. I can't seem to get it working.

回答1:

If you only want the path, not the content then you should not use sc.binaryFiles. In that case you should parallelize the paths and then have the Python code load each file individually, as so:

paths = ['/path/to/file1', '/path/to/file2']
input = sc.parallelize(paths)
processed = input.map(lambda path: (path, processFile(path)))

This assumes of course that each executor Python process can access the files directly. This wouldn't work with HDFS or S3 for instance. Can your library not take binary content directly?



回答2:

map takes function of as single argument and your passing a function of two arguments:

 input.map(lambda filename, content: (STE.extractTextFromPdf(filename,'ste-config.yaml'), content)

Use either

input.map(lambda fc: (STE.extractTextFromPdf(fc[0],'ste-config.yaml'), fc[1])

or

def process(x):
    filename, content = x
    return STE.extractTextFromPdf(filename,'ste-config.yaml'), content

Not that it won't work in general unless:

  • STE.extractTextFromPdf can use Hadoop compliant file system or
  • You use POSIX compliant file system.

If it doesn't you can try :

  • Using pseudofiles like io.BytesIO (if it supports reading from file-like objects at some level).
  • Write content to a temporary file on a local FS and read it from there.