NPRG042 Programming in Parallel Environment
Labs 05 - Spark (result notes)
The initial script for counting words can be copied from the slides. For better splitting, one may use the import re
module and later
re.split(r'[^a-zA-Z0-9\-_]+', line)
to tokenize the line. To remove empty tokens, we can improve it to:
[w for w in re.split(r'[^a-zA-Z0-9\-_]+', line) if w]
-
Count only words that have 3 characters at most. A simple filter will do and we can use the
count
action to get the number of words:words = wiki.flatMap(lambda line: re.split(r'[^a-zA-Z0-9\-_]+', line)) \ .filter(lambda word: word and len(word) <= 3) \ .count()
-
Let's find out, how many words there are for every possible starting letter (character). First, we need to preprocess the data a bit, so we extract the first letter from each word and convert it to
(letter, 1)
pair. Having a collection of pairs is useful for various transformations, such asreduceByKey
, which will aggregate data by the first value in the pair (the letter) and given function sums all the1
into total word count for that letter. Finally, we sort the result by the letter for better readability:words = wiki.flatMap(lambda line: re.split(r'[^a-zA-Z0-9\-_]+', line)) \ .filter(lambda word: word) \ .map(lambda word: (word[0].upper(), 1)) \ .reduceByKey(lambda x, y: x + y) \ .sortByKey() \ .collect()
-
Print out 10 of the most frequent words which have at least 3 characters. We use a similar approach, but the first value in the pair is the word itself. Since we are interested only in the top-k result, it is better to use
takeOrdered
instead of sorting the whole collection. Note thekey=lambda p: -p[1]
, a trick that makes the take sort in descending order:words = wiki.flatMap(lambda line: re.split(r'[^a-zA-Z0-9\-_]+', line)) \ .filter(lambda word: len(word) >= 3) \ .map(lambda word: (word.lower(), 1)) \ .reduceByKey(lambda x, y: x + y) \ .takeOrdered(10, key=lambda p: -p[1]) \
-
Get a list of all words that are both in Czech and English wiki and list top-10 of them. This task introduces new operation
.join()
which takes collection of pairs(K,V)
, joins them with pairs(K,W)
and produces a set(K,(V,W))
. For ordering, we take a rather special metric that multiplies the occurrences in both languages. The occurrences are first divided (as integers) to compensate for the different sizes of the datasets and to eliminate too infrequent words:wiki_en = sc.textFile("/home/_teaching/para/labs/spark/data/wiki-en.txt") wiki_cs = sc.textFile("/home/_teaching/para/labs/spark/data/wiki-cs.txt") words_cs = wiki_cs.flatMap(lambda line: re.split(r'[^a-zA-Z0-9\-_]+', line)) \ .filter(lambda word: re.match(r'^[a-zA-Z]{3,}$', word)) \ .map(lambda word: (word.lower(), 1)) \ .reduceByKey(lambda x, y: x + y)
words = wikien.flatMap(lambda line: re.split(r'[^a-zA-Z0-9-]+', line)) \ .filter(lambda word: re.match(r'^[a-zA-Z]{3,}$', word)) \ .map(lambda word: (word.lower(), 1)) \ .reduceByKey(lambda x, y: x + y) \ .join(words_cs) \ .takeOrdered(10, key=lambda p: -((p[1][0] // 50000) * (p[1][1] // 5000)))