MapReduce : Passing additional file
In an earlier post, we discussed passing additional parameters to MapReduce Job. But there are cases in which we will have to pass some additional files during MapReduce. But since MapReduce runs in multiple nodes, we need to ensure that this additional file that mapper/reducer refers is in that particular node in which its running. In this post we will disucss how to handle this. Let us say we need to find most popular movie from movie-lens database. If you download movie-lens data, there are 2 files in which we are interested in . (u.data and u.item). The format of file is as shown here..
u.data userid movieid rating timestamp 196 242 3 881250949 186 302 3 891717742 22 377 1 878887116 244 51 2 880606923 166 346 1 886397596 u.item movieID Name Release Date URL 1|Toy Story (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?|... 2|GoldenEye (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact|.. 3|Four Rooms (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?|... 4|Get Shorty (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?|..
Implementation:
from mrjob.job import MRJob from mrjob.step import MRStep ''' Goal is to calculate most popular movie from the movie lens data Algorithm : Most watched movie is most popular movie Also pass u.item file along with job, so that it will be available in each nodes, so we can map movieID to a moviename ''' class MRMostPopularMovieName(MRJob): def configure_options(self): super(MRMostPopularMovieName, self).configure_options() self.add_file_option('--items', help='path to u.item') def steps(self): return [ MRStep(mapper = self.mapper_get_count, reducer_init = self.reducer_init, reducer = self.reducer_sum_count), MRStep( reducer = self.reducer_max_count) ] def mapper_get_count(self, _ , line): userId, movieID, rating, timeStamp = line.split('\t') yield movieID, 1 def reducer_init(self): self.movie_names = {} with open("u.item") as f: for line in f: fields = line.split('|') self.movie_names[fields[0]] = fields[1] def reducer_sum_count(self, movieID, values): yield None , (sum(values), self.movie_names[movieID]) def reducer_max_count(self, _ , views): yield max(views) if __name__ == '__main__': MRMostPopularMovieName.run()
>> python most_popular_movie_name.py --items data\ml-100k\u.item data\ml-100k\u.data
Configure Options In the function configure_option we have used add file option to specify that a file will be passed along with this MapReduce Job and should be copied to each node. We have also mentioned that the file will be identified by –items in the command line arg along with the actual data file.
MRJob Step We have also specified some additional steps including a reducer_init which will get called before the reducer in that step gets called. In reducer_init, we create a dictionary of movie names from u.item file which will be present in the same node where mapper/reducer is running.
Another important thing to note in this example is that reducer is passing ‘None’ as key and a tuple of sum(views) and movie_name (sum(values), self.movie_names[movieID]) as value. This will help us to calculate the max(views) in the next reducer, which will produce a single result.