Now that we've seen pair RDDs, in this session we're going to focus on some of the most commonly used pair RDD operations that you're going to find in the wild. Just like we saw before for regular RDDs, these common operations can be broken into two categories, transformations and actions. There are several special transformations specific to pair RDDs that we'll cover, and one special pair RDD action that's called count by key. Now if we look at the some transformations that we'll cover, many look familiar. And functional programming in scholar course, remember operations like groupBy for example. So there's groupBy key that seems kind of similar. And of course, remember stuff like reduce. MapValues, this guy here, sounds kind of like map. We might be familiar with some idea of what join is from some experience that we've had with databases in the past. So these guys kind of sound familiar as well. But let's drill a little bit deeper into some of these operations. Let's start with groupByKey. To do that, let's first recall groupBy from regular Scala collections. Do you remember what this operation does? So just remembering groupBy from regular Scala collections. If we look at the API documentation, groupBy has the following signature. groupBy takes a function from A to K, so a function from some type to the type of the key that the groupBy should return. And of course it returns a map, where the key maps to a collection of elements of type A. The EPI docs succinctly describes the semantics of this operation as follows. So, groupBy partitions this traversable collection into a map of traversable collections according to some discriminator function. We can also say here's a little more simply by looking at it the following way. So assuming you have some kind of collection, the groupBy operation will break up that collection into two or more collections themselves, according to some function that you passed a groupBy. So, according to this function here. The results of the function are the key. So that if you apply this function to individual elements of the original collection, whatever the result is of that function application is the key that that element should correspond to in the resulting map returned by the groupBy operation. So the map returned by this groupBy method contains keys mapped to collections of those key corresponding values. It still sounds a little confusing, doesn't it? I'm trying to make it simpler, but I don't know if you can visualize it. So let's make this a little bit clearer by looking at an example for regular Scala collections. Let's assume that we have a list of integers here, it's called ages. And these integers represent the ages of people. Now let's group this listed ages into categories. One for child. One for adults. And one for seniors. So we have three categories. That means we need a function that when it's applied to the integers in this list, it determines which key that age should correspond to. So let's do a simple conditional express. If the age is greater than or equal to 18 and less than 65, then we know that this person should be an adult. They have to be an adult. Else, if this person is under 18 years old, then this person is a child. And finally, anything else, we say this person is a senior. So we assume that they're over 65. And that means that they're a senior. So now what groupBy does here is applies this function to each element in the ages list here. And it determines the string that corresponds to the age, whether it's a child, adult, or a senior. And then it produces a map as a result, with a list of the ages that correspond to each key. So now we have a map containing three keys, senior, adult, and child, where the corresponding values are lists of the ages that we found in our original list of ages that fall into those categories, that are grouped into those categories. Hence, why we call the operation groupBy. We're grouping elements into a category based on some discriminating function. I hope that makes the groupBy operation a little clearer now. So let's get back to Spark now. Sparks groupByKey operation can be thought of as groupBy on Pair RDDs, but it's specialized on grouping all of the values that have the same key. So that means we're doing it on a collection that's already a collection of pairs, or in this case an RDD of pairs. A pair RDD. That means that we don't have the discriminating function anymore. Now, groupByKey has no arguments. In this case, we only want to group things by the keys that they already have. So we just want to take a RDD that's full of many key value pairs, and we want to now return a RDD with all of the values collected that correspond to a specific key. All of these collected values should be put into some kid of iterable for some kind of regular Scala collection. So we go from countless little pairs of keys and values with possibly many of the same keys floating around with different values. And now we have just one key that matched to a collection of all the values that correspond to that key. And then they have been grouped into a regular Scala collection. To make that a bit more concrete, let's look at an example. Let's say we have a case class called Events that represents events, here. And each event should have an organizer. So let's just say there's a concert or something coming up. And somebody is organizing it. The concert will have a name. And that's the string here. And each event should have a budget of some kind. And so, our first step is to make a pair RDD. So, we do that with this map function here. And we organize things such that our key is the event organizer's name. And then, the value is the budget for the given event. Now, we call by key on that new pair RDD we just created on the previous line. So, what does this call do? I'll let you think about it for a moment. Remember, the key is the organizer. Haha, it was a trick question. The answer is that it does nothing as it is, it returns a reference to an unevaluated RDD. Remember, that groupByKey is a transformation. It's a transformation, so that means it's lazy and nothing happens when you call it. It returns a reference to an unevaluated RDD. But okay, if we called an action on it, in this case collect, here, now we have an array that we've gotten back from our RDD, which forces that computation to take place. Then we can call for each on that array and then print line to just print the elements of the array returned by the collect to see what the result looks like. Assuming that we have a few events that have been organized by the same organizer, the result will look like this here. The organization Prime Sound, for example, has just one event with a budget of $42,000 or Francs or whatever currency it's in. And then the other organization called Sport Org seems to be organizing three different events, each with budgets of $23,000, or Franks, $12,000 of Franks, and $1400 of Franks. So that's group by key, and it's actually quite an often reached for method on large data sets. You might find yourself deciding, my gosh, it would be really useful to group these things by key and then do something with these grouped elements. However, there's another important transformation operation on a pair of RDDs, called reduceByKey. Conceptually, you can think of reduceByKey as two operations. First, the groupByKey that we saw in the previous slide, and then reducing all of the values in the collection that corresponds to each key, so the resulting collections that come back. Then we reduce on those that correspond to each key, via some function. And what's really, really important to note here is that reduceByKey is actually a lot more efficient than doing groupByKey and then reduce independently. We'll see why shortly. But this is an important little note that you should hold on to. So let's look at the signature of reduceByKey. As we can see, reduceByKey takes a function Which only cares about the values of the paired RDD. So we're using V here to represent the values. So we don't actually do anything with the keys in this function, we only operate on the values. This is because we conceptually assume that somehow the values are already grouped by key and now we apply this function to reduce over those values that are in a collection. So this function, we imagine it reducing on all of the elements in the compact buffers here. Again, to make this more concrete, let's look at an example. Let's reuse the events pair RDD from the last example. Now let's say we would like to calculate the total budget per organizer over all of their organized events. Now that we have a little bit of intuition for what reduceByKey does, how might we use it to calculate the total budget per organizer given a pair RDD with the key is the organizer and the value is the budget of some event? I'll give you a minute to figure out how to implement budgetsRDD on your own. This should be quite easy, we just call reduceByKey on the eventsRDD and we pass into it a function lateral that sums into the adjacent values. Again, remember that reduceByKey is a transformation, so that means it's lazy and nothing happens after you call it. Even though it looks kind of like a reduce, which if you recall, is an action on regular RDDs. But on on pair RDD is reduceByKey is a transformation. So that means we have to call some kind of action after our call to reduceByKey. In this case, again, we call collect to start the computation. And then we can print out the results with the regular foreach on the returned array. And here what we can see in the results are that there are four organizations organizing events. And the total budget across each organization is shown as the second element in the returned pairs here. So I hope that illustrates clearly reduceByKey for you, and it's pretty important, because you're going to find this useful in your programming assignments as well. Let's look at another kind of transformation called mapValues, and the one action I mentioned at the beginning of this session, called countByKey. So let's start with mapValues. The mapValues transformation, as its name suggests, applies a given function only to the values of a key value pair. This is something often you may want to do when you want to do a map on a pair RDD, but you remember you have to somehow handle the key in function argument that you pass to the map. So you can think of this operation as a short hand for skipping over having to deal with the keys. It just is a little bit shorter in this example. It has some other benefits as well, we'll talk about that in a later lecture. So said simply mapValues applies functions only to the values in the pair RDD and it skips over the keys. It's pretty straightforward in its way people typically do mapping operations on pair RDD's, they don't really use the map operation a lot of the time. The other operation that we'll look at is countByKey which is an action. Like it's name, it simply counts the number of elements per key in a pair RDD. Importantly, it returns a regular Scala Map, so not anything distributed, but like a regular Scala Collections Map. And that maps the keys to the corresponding counts so you then have a count per key of all of the values that you maybe have per key. And it's very simple and remember it's an action just like the count operation regular RDDs but it's specialized in this case for pair RDDs. As usual, let's look at an example to make these operations a bit more concrete. Again, let's use the event pair id already used in previous slides, and instead let's use each of these operations if we can to compute the average budget per organizer. So now we are taking an average. I'm going to help you a little bit in the beginning, so lets do this step by step, but, lets start with an intermediate RDD here. Lets calculate a pair as the corresponding value of the organizer key. That means our value is also a pair, which will represent the total budget, and the total number of events that correspond to that event organizer. So this is the value here, the value is a pair of its self. And it has the first element is the budget, the first element is the number of your events that correspond to that organizer. Okay, so first question. Can we use countByKey here? Let's start with mapValues. Remember, our pair RDD called eventsRDD contains key value pairs where the key is the organizer and the value is the budget for some event. We know we need to take a sum of the number of events, so we can use mapValues to make a new pair from our existing key value pair that makes it easier to sum up the events per organizer. So that later on, we're going to want to sum up the events. So for the value and the key value pair, we make a new pair where we just leave the value as it was whatever the value was before and we have a second element of the resulting pair, which is just the number one. Just to write this out, in the eventsRDD, we have. The organizer as our key and the budget as our value. This is before the map values. And now after the map values, we're going to have is this. Since we're using mapValues, the key stays the same, nothing happens to it, and the value changes. So basically what we're doing here this b is at the budgets, so we say okay, we keep the budget, whatever, and we add a 1 as well. So do budget and an integer 1. And this is now the result of the mapValues call. Now let's use reduceByKey to return a pair RDD whose value is another pair representing an organization's total budget. And the total number of events that this organization is organizing. So now, what we want is we want this here, this budget and events. We want this to be the result that we get back from this reduced by key. And this is the data that we have to start with. So our data's in this shape right now. So what do I have to pass to reduceByKey here to make sure that the result ends up in this shape. Try it for yourself. So remember that the function passed to reduceByKey operates only on the values of the pair RDD, right? So let's remember the signature of reduceByKey, there was a function that was on the values only, no keys were involved. And the value, in this case, is a pair, do you remember? So this, each of these v's are pairs and these pairs. Are representing a budget as the first element and then just the number 1 as the second element, both of these. So that means in the result here, we can take the two adjacent elements which are each pairs themselves, and we just have to sum up the first elements of the pairs with the first elements of the adjacent ones, and the second elements with the second elements. So basically we're summing up budgets with budgets, and the number ones with with number ones. And then the result type should be another pair RDD where the key remains the organizer string. And the value is a pair of integers that represents the total budget and the total number of events organized by that organization. Do you see what we've done? Both operations that we called in our pair RDD managed to focus only on the values of our pairs, mapValues and reduceByKey. In both cases we never had to worry about messing around with the keys, we could just focus on the values associated with those keys. So does this make sense? Here we sum up the budgets, here we sum up 1s for the total number of events. Okay, but we're not done yet. Remember, we said we wanted to compute the average budget per event organizer over all of their events. So now we finally have the information that we need to compute this average, so then I leave it to you again. Given this RDD that's called intermediate now, we're going to use this guy, how would you compute the average budgets per organizer? So we need the average budget per event organizer now. What operations would you use? Again, try it for yourself first. So here, I've used mapValues again. Remember, mapValues focuses only on the values and not on the keys when it applies its mapping function. So, I can use it to compute the average budget per organizer by simply dividing the total budget by the number of events organized by each organizer. That should now give me a pair RDD where the key still corresponds to the name of the organizer, but now the value is an integer representing the average budget per event of that organizer. And of course, just like before, I just call collect and foreach again to materialize my RDDs and kick off the computation, and then now I can see the results. But wait, didn't I ask you to think about using countByKey? Why didn't I use it here? Well, the answer is pretty simple. In all cases so far, I've been carrying around as my value another pair. So, I've had a pair as a value and I wanted to do something more complicated than simply counting each time, I wanted to compute an average. So in this case, it was easier for me to use mapValues and reduceByKey to get the input that I wanted. It doesn't mean that it's not possible to do this somehow via accountByKey, it's just that in this case I chose mapValues and reduceByKey to get it done. Let's move on to a different transformation called keys. The signature of keys looks like this. It takes no arguments and returns a new RDD, which represents only the keys of the Pair RDD on which it was called. Now, remember this is a transformation which means that it's lazy. At this point you might ask, well, why is that the case because operations like collect are not? They're actions and they return an array back to the master node. In this case, keys is a transformation because the number of keys in a pair RDD could be huge. So, if we tried to return all the keys to the master node like we do for collect, we could find ourselves in the situation where we overwhelm the master node and we run out of memory because we simply weren't aware that there were so many keys. This is why it's a transformation. We don't want to accidentally return an entire RDD to our master node where it doesn't fit into memory. As usual, let's look at an example to try and make the keys method a little more concrete. In this example, we're going to try and use this method to count the number of unique visitors to a website. So, let's say we have a case class visitor here and we have various pieces of information. In this case class, we have the IP address of the person who visited the site, the timestamp that they visited at and the duration that they visited for. So, let's say we have an RDD full of instances of this visitor type. And then here, I make a pair RDD out of it, the slides were missing a line. So, here we have for each visitor, we create a pair where the keys, the IP address and the value is the duration. So, what methods do we call to get a number that represent the number of unique visitors? Well, we simply have to call keys.distinct and then we have to count them up. And note that the action here is the count method because both the keys and distinct methods are transformations. So, this is how we kick off the computation here when we call count. So, I've shown you some of the more common operations on pair RDDs. In the next session, we'll dive into joins, but I should note that other than what I've shown you in this session and other than these join operations that you'll see very soon, there are many more operations you need to pair RDDs. To see all the available methods that you can possibly call on PairRDDs, you can visit the PairRDDFunctions page. Right here, this is what it is called. It's a class in the Sparks API Documentation. This link should work for quite some time, but if for some reason the URLs get changed on the Spark website, all you gotta do is remember that this is a class called PairRDDFunctions. And all of the methods inside of it are the ones that you're going to find available to you on pair RDDs.