Source code for pyrcv.transform

"""Utilities to convert a csv format from Google Forms into pyrcv standard format."""

import re
from typing import Tuple

import numpy as np
import pandas as pd
from pandas._typing import FilePath, ReadCsvBuffer

from .types import PyRcvError, RaceData, RaceMetadata

QUESTION_PATTERN = re.compile(r"^(?P<question>.*?)" r"\s*? " r"\[(?P<option>.*)\]$")

WINNERS_PATTERN = re.compile(r"\((?P<num_winners>\d+)\s+(winners?|WINNERS?|Winners?)\)")


[docs] def parse_google_form_csv( buffer: FilePath | ReadCsvBuffer[bytes] | ReadCsvBuffer[str], ) -> list[RaceData]: """Parses race and ballot info from Googe Form CSV results file. The required format is one header line, followed by one line for each ballot. The column headers are parsed to determine the races and candidates. For example, the following represents two races, one with 3 candidates and one with 2 candidates: Mayor [Abe], Mayor [Betty], Mayor [Chris], Police Chief [Alice], Police Chief [Bob] Each ballot should provide a numerical ranking within each race. A ballot cannot contain duplicate values within a single race. Using the example above, some valid ballots are: 1, 2, 1, 2, 3 # Can use raw numbers 1st, 2nd, 1st, 2nd, 3rd # Can use ordinals 1st, 2nd,,,, 1st # Does not rank Chris or Alice for Police Chief. 2nd,, 1st, 2nd, 3rd # Gap in ranking Mayoral race. Gaps are OK. An example invalid ballot would be: 1, 1, 2, 1, 3 # Duplicate ranking in Mayor race. :param buffer: CSV-parseable data in the format described above. :return: List containing an entry for each race parsed from the CSV file. """ df = pd.read_csv(buffer) race_infos = [] weights = df["weight"].values if "weight" in df else None for metadata, slice_ in parse_header(df.columns): goog = df.iloc[:, slice_].map(_coerce).values goog = np.ma.array(goog, mask=(goog == 0)) argsort = goog.argsort(axis=1) mask = np.take_along_axis(goog.mask, argsort, axis=1) ballots = np.ma.array(argsort, mask=mask) + 1 ballots = ballots.filled(0) if weights is not None: ballots = np.repeat(ballots, weights, axis=0) ballots, votes = np.unique(ballots, axis=0, return_counts=True) race_infos.append(RaceData(metadata, ballots.tolist(), votes.tolist())) return race_infos
[docs] def parse_header(header: list[str]) -> list[Tuple[RaceMetadata, slice]]: """Parse header row list into metadata and a column slices for each race. The Google Form header pattern is the race, followed by one of the options for that race in brackets. Adjacent columns for the same race have the same text, but different options. There also can be an optional parenthetical indicating the number of winners is allowed between the race and the option; if this parenthetical is missing, the race is assumed to be single-winner. :param header: Header row from CSV file :return: List of tuples, one for each race. The tuple contains :class:`RaceMetadata` and a slice indicating the columns corresponding to the options for the race. Examples header values: * ``What is your favorite season? [Spring]`` * ``City Council (4 winners) [Darth Vader]`` * ``Mayor (1 winner) [Luke Skywalker]`` """ current_question = None current_options = [] questions = [] options = [] num_winners_list = [] starts = [] ends = [] for col_idx, col in enumerate(header): question_match = QUESTION_PATTERN.match(col) if question_match: question = question_match.group("question").strip() option = question_match.group("option").strip() if question != current_question: if current_question is not None: ends.append(col_idx) options.append(current_options) current_options = [] winners_match = WINNERS_PATTERN.search(question) if winners_match: num_winners = int(winners_match.group("num_winners")) else: num_winners = 1 questions.append(question) num_winners_list.append(num_winners) starts.append(col_idx) current_question = question current_options.append(option) else: if current_question is not None: current_question = None ends.append(col_idx) options.append(current_options) current_options = [] if current_question is not None: ends.append(col_idx + 1) options.append(current_options) num_questions = len(questions) assert len(options) == num_questions, options assert len(starts) == num_questions, starts assert len(ends) == num_questions, ends return [ (RaceMetadata(q, w, o), slice(s, e)) for q, w, o, s, e in zip(questions, num_winners_list, options, starts, ends) ]
def _coerce(x): if isinstance(x, int): return x if isinstance(x, float): if np.isnan(x): return 0 else: raise ValueError(f"Cannot use float for candidate index: {x}") numbers = re.findall(r"(\d+)[st|nd|rd|th]?", x) if not numbers or len(numbers) > 1: raise PyRcvError(f"Could not determine number: {x}") return int(numbers[0])