From 3fc5dd6d438045a6a85b80aee403d321bc2e537f Mon Sep 17 00:00:00 2001 From: Andrew Date: Sat, 22 Mar 2025 14:53:43 +0000 Subject: [PATCH] [CT414]: Add Assignment 2 code --- .../assignment2/code/MapReduceFiles.java | 313 ++++++++++++++++++ 1 file changed, 313 insertions(+) create mode 100644 year4/semester2/CT414/assignments/assignment2/code/MapReduceFiles.java diff --git a/year4/semester2/CT414/assignments/assignment2/code/MapReduceFiles.java b/year4/semester2/CT414/assignments/assignment2/code/MapReduceFiles.java new file mode 100644 index 00000000..6be28d36 --- /dev/null +++ b/year4/semester2/CT414/assignments/assignment2/code/MapReduceFiles.java @@ -0,0 +1,313 @@ +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.io.IOException; +import java.io.File; +import java.io.FileReader; +import java.io.BufferedReader; +import java.util.Scanner; + +public class MapReduceFiles { + + public static void main(String[] args) { + + if (args.length < 3) { + System.err.println("usage: java MapReduceFiles file1.txt file2.txt file3.txt"); + + } + + Map input = new HashMap(); + try { + input.put(args[0], readFile(args[0])); + input.put(args[1], readFile(args[1])); + input.put(args[2], readFile(args[2])); + } + catch (IOException ex) + { + System.err.println("Error reading files...\n" + ex.getMessage()); + ex.printStackTrace(); + System.exit(0); + } + + // APPROACH #1: Brute force + { + Map> output = new HashMap>(); + + Iterator> inputIter = input.entrySet().iterator(); + while(inputIter.hasNext()) { + Map.Entry entry = inputIter.next(); + String file = entry.getKey(); + String contents = entry.getValue(); + + String[] words = contents.trim().split("\\s+"); + + for(String word : words) { + + Map files = output.get(word); + if (files == null) { + files = new HashMap(); + output.put(word, files); + } + + Integer occurrences = files.remove(file); + if (occurrences == null) { + files.put(file, 1); + } else { + files.put(file, occurrences.intValue() + 1); + } + } + } + + // show me: + System.out.println(output); + } + + + // APPROACH #2: MapReduce + { + Map> output = new HashMap>(); + + // MAP: + + List mappedItems = new LinkedList(); + + Iterator> inputIter = input.entrySet().iterator(); + while(inputIter.hasNext()) { + Map.Entry entry = inputIter.next(); + String file = entry.getKey(); + String contents = entry.getValue(); + + map(file, contents, mappedItems); + } + + // GROUP: + + Map> groupedItems = new HashMap>(); + + Iterator mappedIter = mappedItems.iterator(); + while(mappedIter.hasNext()) { + MappedItem item = mappedIter.next(); + String word = item.getWord(); + String file = item.getFile(); + List list = groupedItems.get(word); + if (list == null) { + list = new LinkedList(); + groupedItems.put(word, list); + } + list.add(file); + } + + // REDUCE: + + Iterator>> groupedIter = groupedItems.entrySet().iterator(); + while(groupedIter.hasNext()) { + Map.Entry> entry = groupedIter.next(); + String word = entry.getKey(); + List list = entry.getValue(); + + reduce(word, list, output); + } + + System.out.println(output); + } + + + // APPROACH #3: Distributed MapReduce + { + final Map> output = new HashMap>(); + + // MAP: + + final List mappedItems = new LinkedList(); + + final MapCallback mapCallback = new MapCallback() { + @Override + public synchronized void mapDone(String file, List results) { + mappedItems.addAll(results); + } + }; + + List mapCluster = new ArrayList(input.size()); + + Iterator> inputIter = input.entrySet().iterator(); + while(inputIter.hasNext()) { + Map.Entry entry = inputIter.next(); + final String file = entry.getKey(); + final String contents = entry.getValue(); + + Thread t = new Thread(new Runnable() { + @Override + public void run() { + map(file, contents, mapCallback); + } + }); + mapCluster.add(t); + t.start(); + } + + // wait for mapping phase to be over: + for(Thread t : mapCluster) { + try { + t.join(); + } catch(InterruptedException e) { + throw new RuntimeException(e); + } + } + + // GROUP: + + Map> groupedItems = new HashMap>(); + + Iterator mappedIter = mappedItems.iterator(); + while(mappedIter.hasNext()) { + MappedItem item = mappedIter.next(); + String word = item.getWord(); + String file = item.getFile(); + List list = groupedItems.get(word); + if (list == null) { + list = new LinkedList(); + groupedItems.put(word, list); + } + list.add(file); + } + + // REDUCE: + + final ReduceCallback reduceCallback = new ReduceCallback() { + @Override + public synchronized void reduceDone(String k, Map v) { + output.put(k, v); + } + }; + + List reduceCluster = new ArrayList(groupedItems.size()); + + Iterator>> groupedIter = groupedItems.entrySet().iterator(); + while(groupedIter.hasNext()) { + Map.Entry> entry = groupedIter.next(); + final String word = entry.getKey(); + final List list = entry.getValue(); + + Thread t = new Thread(new Runnable() { + @Override + public void run() { + reduce(word, list, reduceCallback); + } + }); + reduceCluster.add(t); + t.start(); + } + + // wait for reducing phase to be over: + for(Thread t : reduceCluster) { + try { + t.join(); + } catch(InterruptedException e) { + throw new RuntimeException(e); + } + } + + System.out.println(output); + } + } + + public static void map(String file, String contents, List mappedItems) { + String[] words = contents.trim().split("\\s+"); + for(String word: words) { + mappedItems.add(new MappedItem(word, file)); + } + } + + public static void reduce(String word, List list, Map> output) { + Map reducedList = new HashMap(); + for(String file: list) { + Integer occurrences = reducedList.get(file); + if (occurrences == null) { + reducedList.put(file, 1); + } else { + reducedList.put(file, occurrences.intValue() + 1); + } + } + output.put(word, reducedList); + } + + public static interface MapCallback { + + public void mapDone(E key, List values); + } + + public static void map(String file, String contents, MapCallback callback) { + String[] words = contents.trim().split("\\s+"); + List results = new ArrayList(words.length); + for(String word: words) { + results.add(new MappedItem(word, file)); + } + callback.mapDone(file, results); + } + + public static interface ReduceCallback { + + public void reduceDone(E e, Map results); + } + + public static void reduce(String word, List list, ReduceCallback callback) { + + Map reducedList = new HashMap(); + for(String file: list) { + Integer occurrences = reducedList.get(file); + if (occurrences == null) { + reducedList.put(file, 1); + } else { + reducedList.put(file, occurrences.intValue() + 1); + } + } + callback.reduceDone(word, reducedList); + } + + private static class MappedItem { + + private final String word; + private final String file; + + public MappedItem(String word, String file) { + this.word = word; + this.file = file; + } + + public String getWord() { + return word; + } + + public String getFile() { + return file; + } + + @Override + public String toString() { + return "[\"" + word + "\",\"" + file + "\"]"; + } + } + + private static String readFile(String pathname) throws IOException { + File file = new File(pathname); + StringBuilder fileContents = new StringBuilder((int) file.length()); + Scanner scanner = new Scanner(new BufferedReader(new FileReader(file))); + String lineSeparator = System.getProperty("line.separator"); + + try { + if (scanner.hasNextLine()) { + fileContents.append(scanner.nextLine()); + } + while (scanner.hasNextLine()) { + fileContents.append(lineSeparator + scanner.nextLine()); + } + return fileContents.toString(); + } finally { + scanner.close(); + } + } + +}