This blog contains two python implementations of the SELECT algorithm to select the k-th ordered item from an unsorted array of N items in O(N) time. They derive from the discussion of RANDOM-SELECT and SELECT in the CLRS “Introduction to Algorithms, 3rd Edition” on pages 215-222 and from Professor David Eppstein’s lecture notes entitled “ICS 161: Design and Analysis of Algorithms” from January 30th, 1996.
Contents
- Overview
- Implementation of select_random_pivot
- Implementation of select_median_of_medians_pivot
- Implementation of get_top_n_items
Overview
Both implementations use a divide and conquer approach to recursively partition the input array into smaller and smaller chunks around a pivot point where each chunk contains data that honors an ordering relation to the pivot value until the k-th item matches the pivot. There are a lot of great discussions about how this works on the web and in text books (like CLRS). My goal here is not to try to improve on those discussions (I can’t). Instead, it is to simply show two working implementations.
The first algorithm (select_random_pivot), uses a random number to select the pivot point. The second algorithm (select_median_of_medians_pivot) uses a more deterministic approach to find a better approximation of the median.
The select_random_pivot algorithm has an average run time of O(N) but a worst case run time of O(N^2) in vanishingly rare cases. The select_median_of_medians_pivot algorithm has an average and worst case run time of O(N).
These implementations were written to help me understand how the algorithms work. They were not designed for production use. For example, they do not do any bounds checking and they create sub arrays during recursion instead of operating on a single array in place which would probably be substantially faster but manipulating indices would, in my opinion, detract from the readability.
For a production environment you would want to create a several implementations, test them against characteristic datasets and choose the one that performs the best. When I tested these two implementations on a series of datasets, the select_random_pivot function won because it was at least 2X faster in every case.
To get a feel for how these algorithms might be used, I have included an example that shows how to get the top n ordered items from an unordered array in O(N) time using the get_top_n_items function.
I hope that you find this helpful.
Implementation of select_random_pivot
The first implementation randomly chooses the pivot point. It is an implementation of the random select algorithm as described in CLRS “Introduction to Algorithms, 3rd Edition” on page 216 modified to partition the items in separate arrays.
The average run time is O(N) and the worst case run time isO(N^2) where N is the number of entries in the array but the probability of the worst case occurring in practice is vanishingly small.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
def select_random_pivot(array, k): ''' Implementation of the random select algorithm as described in CLRS "Introduction to Algorithms, 3rd Edition" on page 216 modified to partition the items in separate lists. This algorithm has a worst case run time of O(N^2) where N is the number of entries in the array but the probability of that occuring is vanishingly small. The average case is O(N). It is normally much faster than select_median_of_medians_pivot. Here is how you might use it: # Create a list of pseudo random numbers. # Duplicates can occur. num = 10000 array = [random.randint(1,1000) for i in range(num)] random.shuffle(array) random.shuffle(array) # Get the value of the kth item. k = 7 kval = select_random_pivot(array, k) # Test it. sorted_array = sorted(array) assert sorted_array[k] == kval @param array the list of values @param k k-th item to select. @returns the k-th item ''' # If the array is short, terminate the recursion and return the # value without partitioning. if len(array) <= 10: array.sort() return array[k] # Randomly choose a pivot point. # In vanishingly rare cases this can result in O(N^2). pivot_idx = random.randint(0, len(array) - 1) pivot = array[pivot_idx] # pivot point value (not index) # Now select recursively using the pivot. # At this point we have the pivot. Use it to partition the input # array into 3 categories: items that are less than the pivot # value (array_lt), items that are greater than the pivot value # (array_gt) and items that exactly equal to the pivot value # (equals_array). array_lt = [] array_gt = [] array_eq = [] for item in array: if item < pivot: array_lt.append(item) elif item > pivot: array_gt.append(item) else: array_eq.append(item) # The array values have been partitioned according to their # relation to the pivot value. The partitions look like this: # # +---+---+---+...+---+---+---+...+---+---+---+... # | 0 | 1 | 2 | | e |e+1|e+2| | g |g+1|g+2| # +---+---+---+...+---+---+---+...+---+---+---+... # array_lt array_eq array_gt # # If the value of k is in the range [0..e) then we know that # the desired value is in array_lt so we need to recurse. # # If the value of k in the range [e..g) then we know that the # desired value is in array_eq and we are done. # # If the value of k is >= g then we the desired value is in # array_gt and we need to recurse but we also have to make sure # that k is normalized with respect to array_gt so that it has the # proper offset in the recursion. We normalize it by subtracting # len(array_lt) and len(array_eq). # if k < len(array_lt): return select_random_pivot(array_lt, k) elif k < len(array_lt) + len(array_eq): return array_eq[0] else: normalized_k = k - (len(array_lt) + len(array_eq)) return select_random_pivot(array_gt, normalized_k) |
Implementation of select_median_of_medians_pivot
This implementation chooses a pivot point deterministically to reduce the worst case run time to O(N). It is an implementation of the Blum, Floyd, Pratt, Rivest, Tarjan SELECT algorithm as described by David Eppstein. This algorithm is also discussed in CLRS “Introduction to Algorithms, 3rd Edition” on page 220.
This algorithm has worst case run time of O(N) where N is the number of entries in the array.
Although this algorithm has better worst case performance than select_random_pivot, that algorithm is preferred because it is much faster in practice.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
def select_median_of_medians_pivot(array, k): ''' Implementation of the Blum, Floyd, Pratt, Rivest, Tarjan SELECT algorithm as described by David Eppstein. CITATION: http://www.ics.uci.edu/~eppstein/161/960130.html This algorithm has worst case run time of O(N) where N is the number of entries in the array. Although this algorithm has better worst case performance than select_random_pivot(), that algorithm is preferred because it is much faster in practice. Here is how you might use it: # Create a list of pseudo random numbers. # Duplicates can occur. num = 10000 array = [random.randint(1,1000) for i in range(num)] random.shuffle(array) random.shuffle(array) # Get the value of the kth item. k = 7 kval = select_median_of_medians_pivot(array, k) # Test it. sorted_array = sorted(array) assert sorted_array[k] == kval @param array the list of values @param k k-th item to select. ''' # If the array is short, terminate the recursion and return the # value without partitioning. if len(array) <= 10: array.sort() return array[k] # Partition the array into subsets with a maximum of 5 elements # each. subset_size = 5 # max items in a subset subsets = [] # list of subsets num_medians = len(array) / subset_size if (len(array) % subset_size) > 0: num_medians += 1 # not divisible by 5 for i in range(num_medians): beg = i * subset_size end = min(len(array), beg + subset_size) subset = array[beg:end] subsets.append(subset) # Find the medians in each subset. # Note that it calls select_median_of_medians_pivot() recursively taking # advantage of the fact that for len(array) <= 10, the select # operation simply sorts the array and returns the k-th item. This # could be done here but since the termination condition is # required to get an infinite loop we may as well use it. medians = [] # list of medians for subset in subsets: median = select_median_of_medians_pivot(subset, len(subset)/2) medians.append(median) # Now get the median of the medians recursively. # Assign it to the local pivot variable because # the pivot handling code is the same regardless # of how it was generated. See select_random_pivot() for # a different approach for generating the pivot. median_of_medians = select_median_of_medians_pivot(medians, len(medians)/2) pivot = median_of_medians # pivot point value (not index) # Now select recursively using the pivot. # At this point we have the pivot. Use it to partition the input # array into 3 categories: items that are less than the pivot # value (array_lt), items that are greater than the pivot value # (array_gt) and items that exactly equal to the pivot value # (equals_array). array_lt = [] array_gt = [] array_eq = [] for item in array: if item < pivot: array_lt.append(item) elif item > pivot: array_gt.append(item) else: array_eq.append(item) # The array values have been partitioned according to their # relation to the pivot value. The partitions look like this: # # +---+---+---+...+---+---+---+...+---+---+---+... # | 0 | 1 | 2 | | e |e+1|e+2| | g |g+1|g+2| # +---+---+---+...+---+---+---+...+---+---+---+... # array_lt array_eq array_gt # # If the value of k is in the range [0..e) then we know that # the desired value is in array_lt so we need to recurse. # # If the value of k in the range [e..g) then we know that the # desired value is in array_eq and we are done. # # If the value of k is >= g then we the desired value is in # array_gt and we need to recurse but we also have to make sure # that k is normalized with respect to array_gt so that it has the # proper offset in the recursion. We normalize it by subtracting # len(array_lt) and len(array_eq). # if k < len(array_lt): return select_median_of_medians_pivot(array_lt, k) elif k < len(array_lt) + len(array_eq): return array_eq[0] else: normalized_k = k - (len(array_lt) + len(array_eq)) return select_median_of_medians_pivot(array_gt, normalized_k) |
Implementation of get_top_n_items
This function shows how to get the top n ordered items from an unordered array using the select_random_pivot function. It demonstrates how to use the SELECT algorithms described here.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
def get_top_n_items(array, n): """ Get the top n items. """ nth = select_random_pivot(array, n) # avg case: O(N), WC: O(N^2) top_items = [] # Start by getting all of the values less than the top value # (O(N)). This guarantees that we don't miss some because there # are duplicate nth values. for value in array: if value < nth: top_items.append(value) if len(top_items) == n: return top_items # all done # Now get the remaining values. # This is where duplicates are handled. for value in array: if value == nth: top_items.append(value) if len(top_items) == n: return top_items # all done # We should never reach this point. assert len(top_items) == n |
Updated today to change “select_fct” to the correct function name based on a private comment that noted the use of the undefined “select_fct”. It was a typo.
Hi jlinoff,
Not sure if I’m missing something but I still see “select_fct” being used.
Thank you! Great catch. For some reason I had not saved the changes.