import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.io.IOException; import java.io.File; import java.io.FileReader; import java.io.BufferedReader; import java.util.Scanner; public class MapReduceFiles { public static void main(String[] args) { if (args.length < 3) { System.err.println("usage: java MapReduceFiles file1.txt file2.txt file3.txt"); } Map input = new HashMap(); try { input.put(args[0], readFile(args[0])); input.put(args[1], readFile(args[1])); input.put(args[2], readFile(args[2])); } catch (IOException ex) { System.err.println("Error reading files...\n" + ex.getMessage()); ex.printStackTrace(); System.exit(0); } // APPROACH #1: Brute force { Map> output = new HashMap>(); Iterator> inputIter = input.entrySet().iterator(); while(inputIter.hasNext()) { Map.Entry entry = inputIter.next(); String file = entry.getKey(); String contents = entry.getValue(); String[] words = contents.trim().split("\\s+"); for(String word : words) { Map files = output.get(word); if (files == null) { files = new HashMap(); output.put(word, files); } Integer occurrences = files.remove(file); if (occurrences == null) { files.put(file, 1); } else { files.put(file, occurrences.intValue() + 1); } } } // show me: System.out.println(output); } // APPROACH #2: MapReduce { Map> output = new HashMap>(); // MAP: List mappedItems = new LinkedList(); Iterator> inputIter = input.entrySet().iterator(); while(inputIter.hasNext()) { Map.Entry entry = inputIter.next(); String file = entry.getKey(); String contents = entry.getValue(); map(file, contents, mappedItems); } // GROUP: Map> groupedItems = new HashMap>(); Iterator mappedIter = mappedItems.iterator(); while(mappedIter.hasNext()) { MappedItem item = mappedIter.next(); String word = item.getWord(); String file = item.getFile(); List list = groupedItems.get(word); if (list == null) { list = new LinkedList(); groupedItems.put(word, list); } list.add(file); } // REDUCE: Iterator>> groupedIter = groupedItems.entrySet().iterator(); while(groupedIter.hasNext()) { Map.Entry> entry = groupedIter.next(); String word = entry.getKey(); List list = entry.getValue(); reduce(word, list, output); } System.out.println(output); } // APPROACH #3: Distributed MapReduce { final Map> output = new HashMap>(); // MAP: final List mappedItems = new LinkedList(); final MapCallback mapCallback = new MapCallback() { @Override public synchronized void mapDone(String file, List results) { mappedItems.addAll(results); } }; List mapCluster = new ArrayList(input.size()); Iterator> inputIter = input.entrySet().iterator(); while(inputIter.hasNext()) { Map.Entry entry = inputIter.next(); final String file = entry.getKey(); final String contents = entry.getValue(); Thread t = new Thread(new Runnable() { @Override public void run() { map(file, contents, mapCallback); } }); mapCluster.add(t); t.start(); } // wait for mapping phase to be over: for(Thread t : mapCluster) { try { t.join(); } catch(InterruptedException e) { throw new RuntimeException(e); } } // GROUP: Map> groupedItems = new HashMap>(); Iterator mappedIter = mappedItems.iterator(); while(mappedIter.hasNext()) { MappedItem item = mappedIter.next(); String word = item.getWord(); String file = item.getFile(); List list = groupedItems.get(word); if (list == null) { list = new LinkedList(); groupedItems.put(word, list); } list.add(file); } // REDUCE: final ReduceCallback reduceCallback = new ReduceCallback() { @Override public synchronized void reduceDone(String k, Map v) { output.put(k, v); } }; List reduceCluster = new ArrayList(groupedItems.size()); Iterator>> groupedIter = groupedItems.entrySet().iterator(); while(groupedIter.hasNext()) { Map.Entry> entry = groupedIter.next(); final String word = entry.getKey(); final List list = entry.getValue(); Thread t = new Thread(new Runnable() { @Override public void run() { reduce(word, list, reduceCallback); } }); reduceCluster.add(t); t.start(); } // wait for reducing phase to be over: for(Thread t : reduceCluster) { try { t.join(); } catch(InterruptedException e) { throw new RuntimeException(e); } } System.out.println(output); } } public static void map(String file, String contents, List mappedItems) { String[] words = contents.trim().split("\\s+"); for(String word: words) { mappedItems.add(new MappedItem(word, file)); } } public static void reduce(String word, List list, Map> output) { Map reducedList = new HashMap(); for(String file: list) { Integer occurrences = reducedList.get(file); if (occurrences == null) { reducedList.put(file, 1); } else { reducedList.put(file, occurrences.intValue() + 1); } } output.put(word, reducedList); } public static interface MapCallback { public void mapDone(E key, List values); } public static void map(String file, String contents, MapCallback callback) { String[] words = contents.trim().split("\\s+"); List results = new ArrayList(words.length); for(String word: words) { results.add(new MappedItem(word, file)); } callback.mapDone(file, results); } public static interface ReduceCallback { public void reduceDone(E e, Map results); } public static void reduce(String word, List list, ReduceCallback callback) { Map reducedList = new HashMap(); for(String file: list) { Integer occurrences = reducedList.get(file); if (occurrences == null) { reducedList.put(file, 1); } else { reducedList.put(file, occurrences.intValue() + 1); } } callback.reduceDone(word, reducedList); } private static class MappedItem { private final String word; private final String file; public MappedItem(String word, String file) { this.word = word; this.file = file; } public String getWord() { return word; } public String getFile() { return file; } @Override public String toString() { return "[\"" + word + "\",\"" + file + "\"]"; } } private static String readFile(String pathname) throws IOException { File file = new File(pathname); StringBuilder fileContents = new StringBuilder((int) file.length()); Scanner scanner = new Scanner(new BufferedReader(new FileReader(file))); String lineSeparator = System.getProperty("line.separator"); try { if (scanner.hasNextLine()) { fileContents.append(scanner.nextLine()); } while (scanner.hasNextLine()) { fileContents.append(lineSeparator + scanner.nextLine()); } return fileContents.toString(); } finally { scanner.close(); } } }