NPRG042 Programming in Parallel Environment

Labs 05 - Spark (result notes)

Lab work specification

The initial script for counting words can be copied from the slides. For better splitting, one may use the import re module and later

re.split(r'[^a-zA-Z0-9\-_]+', line)

to tokenize the line. To remove empty tokens, we can improve it to:

[w for w in re.split(r'[^a-zA-Z0-9\-_]+', line) if w]
  1. Count only words that have 3 characters at most. A simple filter will do and we can use the count action to get the number of words:

    words = wiki.flatMap(lambda line: re.split(r'[^a-zA-Z0-9\-_]+', line)) \
    .filter(lambda word: word and len(word) <= 3) \
    .count()
  2. Let's find out, how many words there are for every possible starting letter (character). First, we need to preprocess the data a bit, so we extract the first letter from each word and convert it to (letter, 1) pair. Having a collection of pairs is useful for various transformations, such as reduceByKey, which will aggregate data by the first value in the pair (the letter) and given function sums all the 1 into total word count for that letter. Finally, we sort the result by the letter for better readability:

    words = wiki.flatMap(lambda line: re.split(r'[^a-zA-Z0-9\-_]+', line)) \
    .filter(lambda word: word) \
    .map(lambda word: (word[0].upper(), 1)) \
    .reduceByKey(lambda x, y: x + y) \
    .sortByKey() \
    .collect()
  3. Print out 10 of the most frequent words which have at least 3 characters. We use a similar approach, but the first value in the pair is the word itself. Since we are interested only in the top-k result, it is better to use takeOrdered instead of sorting the whole collection. Note the key=lambda p: -p[1], a trick that makes the take sort in descending order:

    words = wiki.flatMap(lambda line: re.split(r'[^a-zA-Z0-9\-_]+', line)) \
    .filter(lambda word: len(word) >= 3) \
    .map(lambda word: (word.lower(), 1)) \
    .reduceByKey(lambda x, y: x + y) \
    .takeOrdered(10, key=lambda p: -p[1]) \
  4. Get a list of all words that are both in Czech and English wiki and list top-10 of them. This task introduces new operation .join() which takes collection of pairs (K,V), joins them with pairs (K,W) and produces a set (K,(V,W)). For ordering, we take a rather special metric that multiplies the occurrences in both languages. The occurrences are first divided (as integers) to compensate for the different sizes of the datasets and to eliminate too infrequent words:

    
    wiki_en = sc.textFile("/home/_teaching/para/labs/spark/data/wiki-en.txt")
    wiki_cs = sc.textFile("/home/_teaching/para/labs/spark/data/wiki-cs.txt")
    words_cs = wiki_cs.flatMap(lambda line: re.split(r'[^a-zA-Z0-9\-_]+', line)) \
    .filter(lambda word: re.match(r'^[a-zA-Z]{3,}$', word)) \
    .map(lambda word: (word.lower(), 1)) \
    .reduceByKey(lambda x, y: x + y)

words = wikien.flatMap(lambda line: re.split(r'[^a-zA-Z0-9-]+', line)) \ .filter(lambda word: re.match(r'^[a-zA-Z]{3,}$', word)) \ .map(lambda word: (word.lower(), 1)) \ .reduceByKey(lambda x, y: x + y) \ .join(words_cs) \ .takeOrdered(10, key=lambda p: -((p[1][0] // 50000) * (p[1][1] // 5000)))