Source code for pyrcv.pyrcv

"""Implementation of Single Transferable Vote."""

from __future__ import annotations

import collections
import enum
import itertools

import numpy as np
from numpy.typing import ArrayLike

from .types import PyRcvError, RaceData, RaceResult, RoundResult


[docs] class RoundMode(enum.Enum): r"""How to round a fractional vote threshold to win an election. .. math:: votes\_needed = num\_votes / (num\_candidates + 1) """ CEILING = 1 """If threshold is a fraction, use the next highest integer. Used in test of FairVote data, though technically incorrect when result is not a fraction. """ ADD_ONE_FLOOR = 2 # Very close to correct, and easy to understand. """Add one. If threshold is a fraction, use the next lowest integer. Easy to understand, as it maintains an integer threshold. More correct than :attr:`CEILING`, as it avoid ties when the result is an integer before rounding. For example, if there are 100 votes, the threshold to win should be 51 votes, not 50. """ FRACTIONAL = 3 """No rounding of fractional threshold, requires :math:`threshold + epsilon` votes. Technically, this is the most correct, but leads to lots of fractional votes. """
EPSILON = 1e-5
[docs] def run_rcv( race_data: RaceData, round_mode: RoundMode = RoundMode.ADD_ONE_FLOOR ) -> RaceResult: """Run the ranked choice voting algorithm for a single election. RCV is a method of voting in which each voter casts a single vote in the form of a ranked list. Winners are determined via an iterative algorithm that requires a winner to surpass a threshold (e.g. half the votes in a single-winner election). If no-one surpasses the threshold in an iteration, the candidate with the least votes is eliminated and ballots supporting the eliminated candidate are adjusted to the next highest ranked candidate. :param race_data: Full information about the race parameters and votes. :param round_mode: The method for rounding the vote threshold. :raise ValueError: Raised when round_mode has an unknown value, or when the ballot data has the wrong shape or contains bad values. :raise PyRcvError: Raised if an error is detected in the calcuations. :return: RCV results (winners, losers, vote transfers) for each round in the election. """ num_cands = len(race_data.metadata.names) # ballots has shape (num_ballots, max_allowed_ranking). ballots = validate_and_standardize_ballots(race_data.ballots, num_cands) # votes has shape (num_ballots, 1). votes = np.array(race_data.votes)[:, None] # The number of slots is the valid indicies on a ballot. Candidates are # numbered starting from one, and zero is reserved for empty rankings. # Example of ballot which allows 5 rankings, but has 2 empty rankings: # [4, 8, 2, 0, 0] num_slots = num_cands + 1 # weights has the same shape as ballots and describes how a vote is distributed # across multiple candidates in a ballot. It is updated in each iteration of # the RCV algorithm, with the constraint that each row (i.e. one ballot) must # sum to one. weights is initialized so that the top-ranked candidate starts # with all of the weight. weights = np.zeros_like(ballots, float) weights[:, 0] = 1 # The threshold number of votes required to win the election, rounded as # requested. votes_needed = np.sum(votes) / (race_data.metadata.num_winners + 1) if round_mode == RoundMode.CEILING: votes_needed = np.ceil(votes_needed) elif round_mode == RoundMode.ADD_ONE_FLOOR: votes_needed = np.floor(1 + votes_needed) elif round_mode == RoundMode.FRACTIONAL: votes_needed += EPSILON else: raise ValueError(f"round_mode is not a value in RoundMode enum: {round_mode}") # status is used to represent the current state of a candidate: # -1: lost the election # 0: still in the running # 1: a winner in the election status = np.zeros(num_slots, int) status[0] = -2 # slot==0 corresponds to empty marks in a ballot, and is not used. # valid is the same shape as ballots and keeps track of which candidates # on a ballot are still in the running. valid = np.ones_like(ballots, bool) # At the start of each iteration, `orig`` represents the index (on each ballot) # of the highest ranked candidate still in the race. orig = _first_nonzero(valid, axis=1) round_info = [] # Will contain the full results of each round. # Iterate until the required number of winners are found. while np.count_nonzero(status > 0) < race_data.metadata.num_winners: # The number of votes for each candidate (in ballots) is counted, using the # current weights. counts = np.bincount(ballots.ravel(), (weights * votes).ravel(), num_slots) # Reject (mask) the candidates who have already won or lost. counts_masked = np.ma.array(counts, mask=(status != 0)) # A boolean array indicating the candidate (indices) that have been elected # in this round. # Ex: [False, False, True, False, True] indicates that candidates 2 and 4 # have surpassed the vote threshold on this round. # Keep in mind that index==0 is the empty (no vote) candidate. elected_mask = counts_masked >= votes_needed # Indices of candidates elected in this round. elected = [] # Indices of candidates eliminated in this round. eliminated = [] # Candidates removed from consideration (elected or eliminated) to_remove = [] # Fraction of votes kept by those in `to_remove`. Winners keep just # `votes_needed` votes, while losers keep no votes. multipliers = [] # If the number of viable candidates (elected or still in the running) # is the required number of winners, we can just stop now and declare # the winners. if np.count_nonzero(status >= 0) == race_data.metadata.num_winners: # status == 0 are the currently "still in the running" candidates elected = np.nonzero(status == 0)[0] status[status == 0] = 1 elif elected_mask.any(): # New candidates just surpassed the vote threshold, and are removed # from the running. to_remove = np.nonzero(elected_mask)[0] elected = to_remove status[to_remove] = 1 # Newly elected candidates keep only `votes_needed` votes, and pass # on excess votes to the next highest-ranked candidate. multipliers = [votes_needed / counts_masked[c] for c in to_remove] else: # Eliminate candidate with fewest votes. min_counts = np.where(counts_masked == counts_masked.min())[0] # If multiple candidates are tied for last place, select on at random. to_remove = [np.random.choice(min_counts)] eliminated = to_remove status[to_remove] = -1 # Eliminated candidates pass on all their votes to the next # highest-ranked candidates. multipliers = [0] # Distribute votes from elected/eliminated candidates to the next active # lower ranked candidate. Elected candidates transfer excess votes, while # eliminated candidates transfer all votes. transfers = {} for cand, mult in zip(to_remove, multipliers): valid[ballots == cand] = False next = _first_nonzero(valid, axis=1) # Next active candidate on each ballot # Ballots where the candidate changed o_rows = orig != next # Ballots the candidate changed, but where no active candidate remains n_rows = o_rows & (next != -1) # Votes are transferred to next candidate. weights[n_rows, next[n_rows]] = weights[n_rows, orig[n_rows]] * (1 - mult) # Votes are transferred from original candidate. weights[o_rows, orig[o_rows]] *= mult transfers_cand = collections.defaultdict(int) slicer = n_rows, next[n_rows] for c, v in zip(ballots[slicer], (weights * votes)[slicer]): if v: transfers_cand[c] += v if transfers_cand: transfers[cand] = dict(transfers_cand) orig = next # Sanity check that the algorithm didn't misplace any votes. Should not happen. if not np.isclose(counts.sum(), votes.sum()): # pragma: no cover raise PyRcvError("Final round count total does not equal original votes") round_info.append( RoundResult(counts.tolist(), list(elected), list(eliminated), transfers) ) return RaceResult(race_data.metadata, round_info)
def _first_nonzero(arr: ArrayLike, axis: int, invalid_val=-1) -> np.ndarray: """Returns the indices of the first non-zero value along an axis.""" mask = arr != 0 return np.where(mask.any(axis=axis), mask.argmax(axis=axis), invalid_val)
[docs] def validate_and_standardize_ballots( ballots: list[list[int]], num_cands: int ) -> np.ndarray: """Flush out ballots into a 2-d array using zeros, and check for invalid votes. :param ballots: List of ballots, which are lists of ints :param num_cands: Number of candidates in the election. :return: 2-d array of ballot data """ # Fill out ballots to be the same length. def isiter(lst): return isinstance(lst, (list, tuple)) if not isiter(ballots) or not all(isiter(lst) for lst in ballots): raise ValueError("Ballot data are not list of lists") if not all(isinstance(el, int) for lst in ballots for el in lst): raise ValueError("Ballot data are not all integers") ballots_flush = list(itertools.zip_longest(*ballots, fillvalue=0)) ballots = np.array(ballots_flush, dtype=np.int8).T # Validate all rankings are for valid candidate indices. check_oob(ballots, num_cands) # Check there are no duplicate rankings in a ballot. np.apply_along_axis(check_dup_1d, axis=1, arr=ballots) # Add a 0 at the end of every ballot. return np.pad(ballots, ((0, 0), (0, 1)))
[docs] def check_oob(ballots: ArrayLike, num_cands: int): """Checks that ballot indices are positive and <=num_cands. :param ballots: List of ballots, which are lists of ints :param num_cands: Number of candidates in the election. :raises ValueError: If invalid indices are present, their positions are given in the error message. """ ballots = np.asarray(ballots) oob_rankings = (ballots < 0) | (ballots > num_cands) if oob_rankings.any(): bad_ballots = oob_rankings.any(axis=1) bad_indices = np.nonzero(bad_ballots)[0].tolist() raise ValueError(f"Bad value(s) on ballots: {bad_indices}")
[docs] def check_dup_1d(ballot: ArrayLike) -> bool: """Checks if there are duplicate candidate indicies in a ballot. Note that zero constitutes no mark, and duplicate zeros are allowed. :param ballot: Ranking of candidate indices, e.g. ``[4,3,1,0,0]`` :raises ValueError: If duplicate candidate indices present in ballot. :return: True if duplicate candidate indices present in ballot, False otherwise. """ unique, counts = np.unique(ballot, return_counts=True) has_dup = (counts[unique != 0] > 1).any() if has_dup: raise ValueError(f"Ballot has duplicated entry: {ballot.tolist()}") return has_dup