Void’s Vault

Knowledge source for efficiency.

MapReduce File Manipulation Using Pig Scripts

In this article, I will present you Pig, a scripting language that is used with Hadoop. I had to learn Pig for a project I worked on with Mathieu Dumoulin. Pig is very similar to the SQL’s syntax, but allows one to manipulate big data in mapreduce mode quite easily.

First, you can find good tutorials as well as an installation guide at the website of Apache.

Now, I will the different ways to implement a pig script.

Here’s an overview of the project I’m working on with Mathieu Dumoulin. We have two datasets. The first one is about 1000 labeled examples and the second one is a massive stack of 500k+ unlabeled examples. We wanted to know if it is possible to learn from the first dataset in order to be able to find examples in the second dataset that could be considered as confident positives or confident negatives. We could then add these found examples to the first dataset, and then use another learning algorithm to learn over the bonified dataset. We want to see if this strategy will indeed improve the learning performance.

Now, one of the pig scripts we wrote have the following purpose: Given a dataset and a trained dual truncated perceptron, we retrieved a label for every examples in the dataset. Each label is a double value. We saved the dataset in a file that had the following format:

1
2
3
4
5
6
7
8
9
10
0.2,0,0,3,22,1,1032,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,511,511,0.0,0.0,0.0,0.0,1.0,0.0,0.0,255.0,255.0,1.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0
0.3,0,0,1,53,2,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,250,15,1.0,1.0,0.0,0.0,0.06,0.06,0.0,255.0,4.0,0.02,0.07,0.0,0.0,1.0,1.0,0.0,0.0
-0.5,0,0,1,53,2,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,279,14,1.0,1.0,0.0,0.0,0.05,0.05,0.0,255.0,14.0,0.05,0.05,0.0,0.0,1.0,1.0,0.0,0.0
0.7,1,0,1,18,1,1313,325,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,1,2,0.0,0.0,0.0,0.0,1.0,0.0,1.0,112.0,56.0,0.5,0.04,0.01,0.0,0.01,0.02,0.01,0.0
0.2,0,0,3,22,1,1032,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,511,511,0.0,0.0,0.0,0.0,1.0,0.0,0.0,255.0,255.0,1.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0
0.8,0,0,3,22,1,1032,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,511,511,0.0,0.0,0.0,0.0,1.0,0.0,0.0,255.0,255.0,1.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0
-0.9,0,0,1,53,2,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,103,3,1.0,1.0,0.0,0.0,0.03,0.07,0.0,255.0,3.0,0.01,0.06,0.0,0.0,1.0,1.0,0.0,0.0
0.4,0,0,3,22,1,1032,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,511,511,0.0,0.0,0.0,0.0,1.0,0.0,0.0,255.0,255.0,1.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0
0.1,0,0,3,22,1,1032,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,510,510,0.0,0.0,0.0,0.0,1.0,0.0,0.0,255.0,255.0,1.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0
-0.6,1,0,2,26,1,145,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,0.0,0.0,0.0,0.0,1.0,0.0,0.0,255.0,1.0,0.0,0.84,1.0,0.0,0.0,0.0,0.0,0.0

Every row is an example, column 0 contains the label and the other columns represent the example itself. The script then imports the file, sort the data using the first column and returns a defined percentage of the top positive examples and top negative examples.

Indeed, this is a small script, but presenting one of our biggest script would have had any more value, since the Apache’s tutorials are very complete.

We implemented the script in several ways, since our needs changed in time. Thus, I will now show two different ways to implement a pig script.

The first method is to create a .pig script file. That way, you can call it via a bash command. Here’s the .pig script:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
confidence_data = LOAD '$INPUT';
confidenceGroup = GROUP confidence_data ALL;
confidentCount = FOREACH confidenceGroup GENERATE COUNT(confidence_data);

positives = FILTER confidence_data BY $0 > 0.0;
negatives = FILTER confidence_data BY $0 < 0.0;

positivesGroup = GROUP positives ALL;
positivesCount = FOREACH positivesGroup GENERATE COUNT(positives);

negativesGroup = GROUP negatives ALL;
negativesCount = FOREACH negativesGroup GENERATE COUNT(negatives);

positivesOrdered = ORDER positives by $0 DESC;
positivesFiltered = LIMIT positivesOrdered (int) ((positivesCount.$0) * $RATIO +1);
positivesFiltered = FOREACH positivesFiltered GENERATE $1..;

negativesOrdered = ORDER negatives by $0 DESC;
negativesFiltered = LIMIT negativesOrdered (int) ((negativesCount.$0) * $RATIO +1);
negativesFiltered = FOREACH negativesFiltered GENERATE $1..;

allConfidents = UNION positivesFiltered, negativesFiltered;

STORE allConfidents INTO '$OUTPUT';

You can easily understand what this scripts is doing. the FILTER command is used to retrieve the rows matching a certain condition. Here, we split the dataset into by separating the positively labeled and the negatively labeled examples. GENERATE COUNT is the way to count the number of rows of a set. LIMIT is the command to retrieve the first rows of a set, and we used a percentage in the $RATIO parameter to retrieve the top X% rows of a set. The command “GENERATE $1.. ” have the purpose of removing the label. If we had written the command “GENERATE $1..$3” instead, we would have retried only the first three columns of an example, but removing the ending value of the range command allows us to retrieve every desired columns without having to care about the total number of columns. Finally, we store the union of the retrieved examples into an output directory and pig manage the way it is stored by itself.

There is also two useful pig commands that I need to talk about. “REGISTER ‘path/to/file.jar’;” will allow one to use UDFs (user-defined functions). Also, “DEFINE MY_FUNC com.apache.a.package.to.MyFunction();” will define an alias so MY_FUNC can be used in a pig script instead of writing a long packages prefix.

Here’s how to call the pig script in bash. Note that the script can be called in local mode as well as in Mapreduce mode, without any alteration of the code, which is very interesting.

1
2
3
4
5
6
7
8
9
10
11
# To run the script in local mode
pig -x local -param INPUT='path/to/dataset.tar.gz' -param RATIO=0.3 -param OUTPUT='path/to/output/folder/' TopKPercent.pig

# To run the script in mapreduce mode
pig -x mapreduce -param INPUT='path/to/dataset.tar.gz' -param RATIO=0.3 -param OUTPUT='path/to/output/folder/' TopKPercent.pig

# Note that Pig can accept a lot of datafiles, not only compressed files. 
# I just wanted to show that compressed files like tar.gz are accepted.

# If you want to enter queries line by line, use pig's command line called grunt
pig -x local

Also, I present a nice way to write pig scripts in java. This will allow one to use pig script in a java application. Using this technique, you don’t have to write any pig file, since the query is registered via multiple input strings in java.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package com.fujitsu.ca.fic.utils.pig;

import java.io.IOException;

import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;

public class TopKWithPig {

    private final String MAPREDUCE_MODE = "mapreduce";
    private final String LOCAL_MODE = "local";

    public void filterWithMapReduce(String intputFileName, String outputDir, double keepRatio) throws ExecException, IOException {
        filterTopK(MAPREDUCE_MODE, intputFileName, outputDir, keepRatio);
    }

    public void filterLocally(String intputFileName, String outputDir, double keepRatio) throws ExecException, IOException {
        filterTopK(LOCAL_MODE, intputFileName, outputDir, keepRatio);
    }

    private void filterTopK(String mode, String intputFileName, String pigOutputDir, double keepRatio) throws ExecException, IOException {
        PigServer pigServer = null;

        try {
            pigServer = new PigServer(mode);
            runIdQuery(pigServer, intputFileName, pigOutputDir, String.valueOf(keepRatio));
        } finally {
            if (pigServer != null) {
                pigServer.shutdown();
            }
        }
    }

    private void runIdQuery(PigServer pigServer, String inputFile, String outputDir, String keepRatio) throws IOException {
        pigServer.registerQuery("confidence_data = LOAD '" + inputFile + "';");
        pigServer.registerQuery("confidenceGroup = GROUP confidence_data ALL;");
        pigServer.registerQuery("confidentCount = FOREACH confidenceGroup GENERATE COUNT(confidence_data);");
        pigServer.registerQuery("positives = FILTER confidence_data BY $0 > 0.0;");
        pigServer.registerQuery("negatives = FILTER confidence_data BY $0 < 0.0;");
        pigServer.registerQuery("positivesGroup = GROUP positives ALL;");
        pigServer.registerQuery("positivesCount = FOREACH positivesGroup GENERATE COUNT(positives);");
        pigServer.registerQuery("negativesGroup = GROUP negatives ALL;");
        pigServer.registerQuery("negativesCount = FOREACH negativesGroup GENERATE COUNT(negatives);");
        pigServer.registerQuery("positivesOrdered = ORDER positives by $0 DESC;");
        pigServer.registerQuery("positivesFiltered = LIMIT positivesOrdered (int) ((positivesCount.$0) * " + keepRatio + " +1);");
        pigServer.registerQuery("positivesFiltered = FOREACH positivesFiltered GENERATE $1..;");
        pigServer.registerQuery("negativesOrdered = ORDER negatives by $0 DESC;");
        pigServer.registerQuery("negativesFiltered = LIMIT negativesOrdered (int) ((negativesCount.$0) * " + keepRatio + " +1);");
        pigServer.registerQuery("negativesFiltered = FOREACH negativesFiltered GENERATE $1..;");
        pigServer.registerQuery("allConfidents = UNION positivesFiltered, negativesFiltered;");
        pigServer.store("allConfidents", outputDir);
    }
}

However, one may want to run a pig script in java without embedding the script in a string. I think this is the best way to process, because pig scripts can be a nightmare to debug if it’s embedded in java code. Here’s how to do that.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.fujitsu.ca.fic.utils.pig;

import java.io.IOException;

import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;

public class TopKWithPig {

    private final String MAPREDUCE_MODE = "mapreduce";
    private final String LOCAL_MODE = "local";
    private final String pigScriptName = "relative/path/To/script/TopKPercent.pig"

    public void filterWithMapReduce(String intputFileName, String outputDir, double keepRatio) throws ExecException, IOException {
        filterTopK(MAPREDUCE_MODE, intputFileName, outputDir, keepRatio);
    }

    public void filterLocally(String intputFileName, String outputDir, double keepRatio) throws ExecException, IOException {
        filterTopK(LOCAL_MODE, intputFileName, outputDir, keepRatio);
    }

    private void filterTopK(String mode, String intputFileName, String pigOutputDir, double keepRatio) throws ExecException, IOException {
        PigServer pigServer = null;

        Map<String,String> params = new HashMap<String,String>();
        params.put("INPUT", "path/to/dataset.tar.gz");
        params.put("RATIO", "0.3");
        params.put("OUTPUT", "path/to/output/folder/");

        InputStream inputScript = getClass().getClassLoader().getResourceAsStream(pigScriptName);

        try {
            pigServer = new PigServer(mode);
            pigServer.registerScript(inputScript, params);
            pigServer.executeBatch();
        } finally {
            if (pigServer != null) {
                pigServer.shutdown();
            }
        }
    }
}

Enjoy!

Special thanks to Mathieu Dumoulin. He taught me a lot in this field of expertise.