Pyspark learning notes column series contents
Pyspark study notes (I) - Preface and contents Pyspark learning notes (II) - spark submit command SparkContext and session III Pyspark learning notes (IV) elastic distributed data set RDD (I) Pyspark learning notes (IV) elastic distributed data set RDD (II) Pyspark learning notes (V) RDD operation (I)_ RDD conversion operation
Pyspark learning notes (V) RDD operation (II)_ RDD action operation
Article catalogue
- Pyspark learning notes column series contents
- Pyspark learning notes (V) RDD operation (II)_ RDD action operation
- preface
- Main reference links:
- 1, Introduction to PySpark RDD operation
- II Common conversion operation table & use ex amp les
preface
Tip: this blog is about the action operation in RDD operation, that is, RDD Action
Main reference links:
1.PySpark RDD Actions with examples 2.Apache spark python api
1, Introduction to PySpark RDD operation
PySpark RDD actions are PySpark actions that return values to the driver The action operation will trigger the previous conversion operation for execution. That is, only when the program encounters an action operation, a series of conversion operations in the previous RDD pedigree will be calculated, and the final result will be obtained by the action operation.
II Common conversion operation table & use ex amp les
0. Initial example rdd,
We still use RDD in the above blog post_ Test as an example, which can better connect with the content mentioned earlier
[ ((10,1,2,3), (10,1,2,4), (10,1,2,4), (20,2,2,2), (20,1,2,3)) ]
1.count()
This operation does not accept parameters and returns a long value representing the number of rdd elements pyspark.RDD.count Just test RDD_ The differences between test after map and flatMap
# the example of count rdd_map_test = rdd_test.map(lambda x: x) print("count_test1\n", rdd_map_test.count()) # out 1
# the example of count rdd_flatmap_test = rdd_test.flatMap(lambda x: x) print("count_test2\n", rdd_flatmap_test.count()) # out 5
The analysis is as follows: map does not remove nesting, so the element in the list is a (5,4) two-dimensional tuple; The flatMap will remove a layer of nesting, which is equivalent to five (4,) one-dimensional tuple s
2.collect(<num>)
There is no limit on the number of RDD elements in the output list (note that there is no limit on the number of RDD elements to be returned); There is no need to give an example of this action operation. In fact, the function of the conversion operation in the last blog post is displayed through the action operation of collect. pyspark.RDD.collect
3.take(<num>)
Returns the first n elements of RDD (no specific order) (this method should only be used when the expected result array is small, because all data has been loaded into the driver's memory) pyspark.RDD.take
# the example of take print("take_test\n", rdd_flatmap_test.take(3))
[((10,1,2,3), (20,2,2,2), (20,1,2,3))]
4.takeOrdered(num, key=None)
Return the first n elements from an RDD arranged in ascending order or the RDD arranged in ascending order according to the method provided in the key (this method should only be used when the expected result array is small, because all data has been loaded into the driver's memory) pyspark.RDD.takeOrdered
# the example of takeOrdered print("takeOrdered_test_1\n",flat_rdd_test.takeOrdered(3)) print("takeOrdered_test_1\n",flat_rdd_test.takeOrdered(3, key=lambda x:x[3])) # out [(10,1,2,3), (10,1,2,4), (10,1,2,4)] # Sort by the size of child tuple elements by default [(20,2,2,2), (10,1,2,3), (20,1,2,3)] # At this time, the order is the number at the [3rd] position of the child tuple element
5.takeSample(withReplacement, num, seed=None)
Returns the fixed size collection of this RDD (this method should only be used when the expected result array is small, because all data has been loaded into the driver's memory) pyspark.RDD.takeSample
print("takeOrdered_test_1\n",flat_rdd_test.takeSample(False, 1, 1)) [(10,1,2,4)] print("takeOrdered_test_1\n",flat_rdd_test.takeSample(False, 3, 1)) [(10,1,2,4), (20,1,2,3), (10,1,2,4)] print("takeOrdered_test_1\n",flat_rdd_test.takeSample(False, 10, 1)) [(10,1,2,4), (20,1,2,3), (10,1,2,4), (20,2,2,2), (10,1,2,3)]
6.top(num, key=None)
Return the first n elements of RDD (output in descending order, and the sorting method is determined by the element type) (this method should only be used when the expected result array is small, because all data has been loaded into the driver's memory) pyspark.RDD.top
print("top_test\n",flat_rdd_test.top(3)) [(20,2,2,2), (20,1,2,3), (10,1,2,4)]
7.first()
Returns the first element of RDD, regardless of element order pyspark.RDD.first
print("first_test\n",flat_rdd_test.first(3)) [(10,1,2,3)]
8.reduce(<func>)
Reduce all elements in RDD by using the specified operators satisfying the commutative / associative law; In general, you can specify an anonymous function that receives two inputs < lambda x, Y:.... >;
print("reduce_test\n",flat_rdd_test.reduce(lambda x, y: x+y)) [(10,1,2,3,10,1,2,4,10,1,2,4,20,2,2,2,20,1,2,3)]
Explain the process:
step_0: [(10,1,2,3), (10,1,2,4), (10,1,2,4), (20,2,2,2), (20,1,2,3)] step_1: (10,1,2,3) => x; (10,1,2,4) => y x + y => (10,1,2,3) + (10,1,2,4) => (10,1,2,3,10,1,2,4) step_2: (10,1,2,3,10,1,2,4) => x; (10,1,2,4) => y x + y => (10,1,2,3,10,1,2,4) + (10,1,2,4) => (10,1,2,3,10,1,2,4,10,1,2,4) step_3: (10,1,2,3,10,1,2,4,10,1,2,4) => x; (20,2,2,2) => y x + y => (10,1,2,3,10,1,2,4,10,1,2,4) + (20,2,2,2) => (10,1,2,3,10,1,2,4,10,1,2,4,20,2,2,2) step_4: (10,1,2,3,10,1,2,4,10,1,2,4,20,2,2,2) => x; (20,1,2,3) => y x + y => (10,1,2,3,10,1,2,4,10,1,2,4,20,2,2,2) + (20,1,2,3) => (10,1,2,3,10,1,2,4,10,1,2,4,20,2,2,2,20,1,2,3)
9.foreach(<func>)
Apply named or anonymous functions to all elements of RDD Similar to map, but because foreach is an action operation, it can execute some functions of output class, such as print operation pyspark.RDD.foreach
10.countByValue()
Return the count of each unique value in this RDD as a dictionary of (unique_value, count) pairs pyspark.RDD.countByValue
print("top_test\n",flat_rdd_test.countByValue().items() ) [((10,1,2,3),1), ((20,1,2,3),1), ((20,2,2,2),1), ((10,1,2,4),2)]
11.fold(zeroValue, func)
Use the given func and initial value zeroV to aggregate the elements of each partition in RDD, and then aggregate the aggregation results of each partition again; The process of aggregation is similar to that of reduce, but it does not satisfy the exchange law Here's a detail to note. fold applies zeroValue to each partition for aggregation, not just once
''' ① Apply at each node fold: Initial value zeroValue + Within the partition RDD element ② Get each partition After the aggregate values of, aggregate these values again, and the same applies zeroValue; ③ The result should be: zeroValue * (partition_num + 1) + RDD Element aggregate value '''
Examples are as follows:
rdd_2 = spark.sparkContext.parallelize(['A_a#', 'B_b#', 'C_c#', 'D_d#'], 1) rdd_3 = spark.sparkContext.parallelize(['A_a#', 'B_b#', 'C_c#', 'D_d#'], 4) print('fold_test_2', rdd_2.fold('zeroV$_', lambda x,y: x+y)) print('fold_test_3', rdd_3.fold('zeroV$_', lambda x,y: x+y))
If the partition of rdd2 is 1, the initial value will only appear twice:
'ZeroV$_ZeroV$_A_a#B_b#C_c#D_d#'
If the partition of rdd3 is 4, the initial value will appear 5 times:
'ZeroV$_ZeroV$_A_a#zeroV$_B_b#zeroV$_C_c#zeroV$_D_d#'
Then to flat_rdd conducts an experiment:
print("fold_test", flat_rdd_test.repartition(1).fold(('Hello','World'), lambda x,y: x+y)) ('Hello','World','Hello','World',10,1,2,3,10,1,2,4,10,1,2,4,20,2,2,2,20,1,2,3)
12.aggregate(zeroValue, seqOp, combOp)
Aggregate the aggregation of each partition using the given function and initial value (here, for each partition, the use rules of initial value are the same as that of fold, which is adopted for each partition) The seqOp method is to first operate on each partition, and then combine to finally aggregate the aggregation results of each partition
rdd_agg_test = spark.sparkContext.parallelize([1,2,3,10,20,30,7,8,9],3) seqOp = (lambda x, y: (x[0] + y, x[1] + 1)) combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1])) result_rdd = rdd_agg_test.aggregate((100,1000),seqOp,combOp) (490, 4009)
seqOp : partition_1: 100 + 1 + 2 + 3, 1000 + 1 + 1 + 1 => 106, 1003 partition_2: 100 + 10 + 20 + 30, 1000 + 1 + 1 + 1 => 160, 1003 partition_3: 100 + 7 + 8 + 9, 1000 + 1 + 1+ 1 => 124, 1003 combOp : 100+106+160+124, 1000+1003+1003+1003 => (490, 4009)
So far, the common methods of action operation are basically introduced