How can I incorporate the current input filename i

2020-02-23 08:02发布

I am processing data from a set of files which contain a date stamp as part of the filename. The data within the file does not contain the date stamp. I would like to process the filename and add it to one of the data structures within the script. Is there a way to do that within Pig Latin (an extension to PigStorage maybe?) or do I need to preprocess all of the files using Perl or the like beforehand?

I envision something like the following:

-- Load two fields from file, then generate a third from the filename
rawdata = LOAD '/directory/of/files/' USING PigStorage AS (field1:chararray, field2:int, field3:filename);

-- Reformat the filename into a datestamp
annotated = FOREACH rawdata GENERATE
  REGEX_EXTRACT(field3,'*-(20\d{6})-*',1) AS datestamp,
  field1, field2;

Note the special "filename" datatype in the LOAD statement. Seems like it would have to happen there as once the data has been loaded it's too late to get back to the source filename.

标签: apache-pig
4条回答
Juvenile、少年°
2楼-- · 2020-02-23 08:26

A way to do this in Bash and PigLatin can be found at: How Can I Load Every File In a Folder Using PIG?.

What I've been doing lately though, and find to be much cleaner is embedding Pig in Python. That let's you throw all sorts of variables and such between the two. A simple example is:

#!/path/to/jython.jar                                    

# explicitly import Pig class                                                                         
from org.apache.pig.scripting import Pig

# COMPILE: compile method returns a Pig object that represents the pipeline                           
P = Pig.compile(
               "a = load '$in'; store a into '$out';")

input = '/path/to/some/file.txt'
output = '/path/to/some/output/on/hdfs'

# BIND and RUN                                                                                        
results = P.bind({'in':input, 'out':output}).runSingle()

if results.isSuccessful() :
    print 'Pig job succeeded'
else :
    raise 'Pig job failed'

Have a look at Julien Le Dem's great slides as an introduction to this, if you're interested. There's also a ton of documentation at http://pig.apache.org/docs/r0.9.2/cont.pdf.

查看更多
我只想做你的唯一
3楼-- · 2020-02-23 08:31

-tagSource is deprecated in Pig 0.12.0 . Instead use

-tagFile - Appends input source file name to beginning of each tuple.
-tagPath - Appends input source file path to beginning of each tuple.

A = LOAD '/user/myFile.TXT' using PigStorage(',','-tagPath'); 
DUMP A  ;

will give you the full file path as first column

( hdfs://myserver/user/blo/input/2015.TXT,439,43,05,4,NAVI,PO,P&C,P&CR,UC,40)

Refrence: http://pig.apache.org/docs/r0.12.0/api/org/apache/pig/builtin/PigStorage.html

查看更多
家丑人穷心不美
4楼-- · 2020-02-23 08:41

You can use PigStorage by specify -tagsource as following

A = LOAD 'input' using PigStorage(',','-tagsource'); 
B = foreach A generate INPUT_FILE_NAME; 

The first field in each Tuple will contain input path (INPUT_FILE_NAME)

According to API doc http://pig.apache.org/docs/r0.10.0/api/org/apache/pig/builtin/PigStorage.html

Dan

查看更多
倾城 Initia
5楼-- · 2020-02-23 08:45

The Pig wiki as an example of PigStorageWithInputPath which had the filename in an additional chararray field:

Example

A = load '/directory/of/files/*' using PigStorageWithInputPath() 
    as (field1:chararray, field2:int, field3:chararray);

UDF

// Note that there are several versions of Path and FileSplit. These are intended:
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.Tuple;

public class PigStorageWithInputPath extends PigStorage {
   Path path = null;

   @Override
   public void prepareToRead(RecordReader reader, PigSplit split) {
       super.prepareToRead(reader, split);
       path = ((FileSplit)split.getWrappedSplit()).getPath();
   }

   @Override
   public Tuple getNext() throws IOException {
       Tuple myTuple = super.getNext();
       if (myTuple != null)
          myTuple.append(path.toString());
       return myTuple;
   }
}
查看更多
登录 后发表回答