Void’s Vault

Knowledge source for efficiency.

How to Use Mahout’s Distributed Machine Learning Framework With Hadoop

Ok this post will be really short (…Not!). I present the references I know on how to use Mahout, plus I show how to use and how to extend Mahout.

First of all, if you want to use Mahout, you’ll need to install it on your computer. There, you will find how to install it, along with some examples on how to use it with a bash console.

The thing is that you will certainly need to use Mahout with java, which is quite easy when you know where to start. I strongly recommend that you get this book called “Mahout In Action” from Sean Owen, Robin Anil, Ted Dunning and Ellen Friedman. The code examples are not clean at all, but with them you will be able to do everything you need. Still, it’s a great reference book and it’s very complete. Here’s a legit link to their book.

Now, here’s a quick example on how to use it in java. In this example, we use Mahout’s distributed AdaptiveLogisticRegression learning algorithm:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import org.apache.mahout.classifier.sgd.AdaptiveLogisticRegression;
import org.apache.mahout.classifier.sgd.CrossFoldLearner;
import org.apache.mahout.classifier.sgd.L1;
import org.apache.mahout.math.DenseVector;
import org.apache.mahout.math.Vector;

public class AdaptativeLogisticRegressionIrisTester {
    public static void main(String[] args) {
        CrossFoldLearner bestLearner = null;

        try {
            CreateIrisDataset dataLoader = new CreateIrisDataset();

            List<Vector> examples = dataLoader.loadExamplesFromFile(new File("data/iris.data"));
            List<Integer> actualLabels = dataLoader.getLabels();

            bestLearner = trainBestClassifier(dataLoader, examples, actualLabels);

            double averageCorrect = 0.0;

            for (int i = 0; i < examples.size(); i++) {
                int actual = actualLabels.get(i);
                Vector v = examples.get(i);
                Vector p = new DenseVector(3);

                bestLearner.classifyFull(p, v);
                averageCorrect = averageCorrect + ((p.maxValueIndex() == actual ? 1 : 0) - averageCorrect) / (i + 1);
            }
            System.out.printf("Final training Accuracy = %10.2f\n", averageCorrect * 100);

        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (bestLearner != null) {
                bestLearner.close();
            }
        }
    }

    private static CrossFoldLearner trainBestClassifier(CreateIrisDataset dataLoader, List<Vector> examples, List<Integer> actualLabels)
            throws IOException {
        AdaptiveLogisticRegression learningAlgorithm = null;
        try {
            learningAlgorithm = new AdaptiveLogisticRegression(dataLoader.categoriesCount(), dataLoader.featuresCount(), new L1());

            for (int i = 0; i < examples.size(); i++) {
                int actual = actualLabels.get(i);
                Vector v = examples.get(i);
                learningAlgorithm.train(actual, v);
            }
        } finally {
            if (learningAlgorithm != null) {
                learningAlgorithm.close();
            }
        }

        return learningAlgorithm.getBest().getPayload().getLearner();
    }
}

As you just saw, using Mahout’s distributed learning framework is quite easy. I also refer to my other posts in machine learning if you want to use Mahout with Hadoop. Still you will also find plenty in google. (post 1)(post 2)(post 3)

Also, here’s how to to add another learning algorithm to Mahout’s framework. We build a perceptron that extends Mahout’s AbstractVectorClassifier. Doing that, you just have to override a few methods and your algorithm will be able to be distributed. Mahout will do the rest, saving you a lot of pain. If you read carefully, you see that our perceptron is trained on one computer, but the classification part is distributed. Again, I refer to the article I wrote with Mathieu Dumoulin.

Enjoy!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package com.fujitsu.ca.fic.classifiers.perceptron;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.mahout.classifier.AbstractVectorClassifier;
import org.apache.mahout.math.DenseVector;
import org.apache.mahout.math.Matrix;
import org.apache.mahout.math.SparseRowMatrix;
import org.apache.mahout.math.Vector;

public class Perceptron extends AbstractVectorClassifier {
    private GramMatrix gram;
    private RBFKernel kernel;

    private Matrix positiveDataset;
    private Vector alpha;
    private double gamma;

    public Perceptron(Matrix positiveTrainDataset, double gamma) {
        System.out.println("Building Gram Matrix");
        this.kernel = new RBFKernel(gamma);
        this.gamma = gamma;
        this.gram = new GramMatrix(positiveTrainDataset);
        System.out.println("done!");

        this.positiveDataset = positiveTrainDataset;
    }

    public void train(double C) {
        int numRowsInPositiveDataset = positiveDataset.numRows();
        Vector K = new DenseVector(numRowsInPositiveDataset);
        K.assign(0.0);
        alpha = new DenseVector(numRowsInPositiveDataset);
        alpha.assign(0);

        boolean wasUpdated;
        do {
            wasUpdated = false;
            for (int i = 0; i < numRowsInPositiveDataset; i++) {
                if (K.get(i) <= 0 && alpha.get(i) < C) {

                    alpha.set(i, alpha.get(i) + 1);
                    wasUpdated = true;
                    for (int j = 0; j < numRowsInPositiveDataset; j++) {
                        K.set(j, K.get(j) + gram.at(i, j));
                    }
                }
            }

        } while (wasUpdated);
    }

    @Override
    public double classifyScalar(Vector example) {
        double sums = 0.0;

        for (int j = 0; j < positiveDataset.numRows(); j++) {
            sums += alpha.get(j) * kernel.calculateScalarProduct(positiveDataset.viewRow(j), example);
        }
        return sums;
    }

    @Override
    public int numCategories() {
        return 2;
    }

    @Override
    public Vector classify(Vector example) {
        // Will never happen, since a perceptron is a binary classificator.
        // Nevertheless, implements classifyScalar with a Vector as the return value.
        Vector classification = new DenseVector(1);

        classification.set(0, classifyScalar(example));
        return classification;
    }

    public Perceptron(Configuration conf, Path path) throws IOException {
        FSDataInputStream input = null;

        try {
            FileSystem fs = FileSystem.get(conf);
            input = fs.open(path);

            initializeWithBinary(input);
        } finally {
            if (input != null) {
                input.close();
            }
        }

        kernel = new RBFKernel(gamma);
    }

    private void initializeWithBinary(FSDataInputStream input) throws IOException {
        gamma = input.readDouble();

        int alphaSize = input.readInt();
        alpha = new DenseVector(alphaSize);
        for (int row = 0; row < alpha.size(); row++) {
            alpha.set(row, input.readDouble());
        }

        int rowSize = input.readInt();
        int colSize = input.readInt();
        positiveDataset = new SparseRowMatrix(rowSize, colSize);
        for (int row = 0; row < positiveDataset.numRows(); row++) {
            for (int col = 0; col < positiveDataset.numCols(); col++) {
                positiveDataset.set(row, col, input.readDouble());
            }
        }
    }

    public void saveToFile(Configuration conf, Path path) throws IOException {
        FSDataOutputStream output = null;
        try {
            FileSystem fs = FileSystem.get(conf);
            output = fs.create(path);

            // gamma
            output.writeDouble(gamma);
            // alpha
            output.writeInt(alpha.size());
            for (int row = 0; row < alpha.size(); row++) {
                output.writeDouble(alpha.get(row));
            }
            // positiveDataset
            output.writeInt(positiveDataset.numRows());
            output.writeInt(positiveDataset.numCols());
            for (int row = 0; row < positiveDataset.numRows(); row++) {
                for (int col = 0; col < positiveDataset.numCols(); col++) {
                    output.writeDouble(positiveDataset.get(row, col));
                }
            }
        } finally {
            if (output != null) {
                output.close();
            }
        }
    }
}

Oh… I forgot to show an example on how to call a mapreduce job with Hadoop. I refer to <a href="http://www.blogdugas.net/?p=100&quot;&gt;there&lt;/a&gt; if you want to know how to have a tested mapper or a tested reducer. Once you have implemented these classes, here’s how to call a mapreduce job. Enjoy!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.mahout.common.HadoopUtil;
import org.apache.mahout.math.VectorWritable;

import com.fujitsu.ca.fic.classifiers.perceptron.Perceptron;
import com.fujitsu.ca.fic.mapreduce.FindConfidentExamplesReducer;
import com.fujitsu.ca.fic.mapreduce.UnlabeledKddMapper;

public class ClassifyWithPerceptronJob {
    private String inputFilename = &quot;data/kdd1999/unlabled&quot;;
    private String outputDirname = &quot;data/perceptron/output&quot;;

    private static final String MODEL_PATH = &quot;model/perceptron.model&quot;;
    private Configuration conf;

    public ClassifyWithPerceptronJob(String inputFilename, String outputDirname) {
        this.inputFilename = inputFilename;
        this.outputDirname = outputDirname;

        conf = setupConfiguration();
    }

    private Configuration setupConfiguration() {
        conf = new Configuration();

        conf.set(&quot;mapred.compress.map.output&quot;, &quot;true&quot;);
        conf.set(&quot;mapred.output.compression.type&quot;, &quot;BLOCK&quot;);
        conf.set(&quot;io.serializations&quot;, &quot;org.apache.hadoop.io.serializer.JavaSerialization,&quot;
                + &quot;org.apache.hadoop.io.serializer.WritableSerialization&quot;);

        return conf;
    }

    public void applyConfidenceScoreToUnlabeled(Perceptron trainedPerceptron) throws IOException, InterruptedException,
            ClassNotFoundException, URISyntaxException {

        Path modelPath = new Path(MODEL_PATH);
        HadoopUtil.delete(conf, modelPath);
        trainedPerceptron.saveToFile(conf, modelPath);

        Path input = new Path(inputFilename);
        Path output = new Path(outputDirname);
        HadoopUtil.delete(conf, output);

        Job job = new Job(conf, &quot;Compute Confidence of Unlabled with Perceptron&quot;);
        job.setJarByClass(getClass());

        job.setMapperClass(UnlabeledKddMapper.class);
        job.setReducerClass(FindConfidentExamplesReducer.class);

        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(VectorWritable.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, input);
        FileOutputFormat.setOutputPath(job, output);

        job.waitForCompletion(true);
    }
}

Don’t hesitate to contact me if you have any questions! I strongly hope that this article will be useful to somebody someday!