Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature/mypy-types-cleanlab-internal-utils #608

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
120 changes: 84 additions & 36 deletions cleanlab/internal/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,21 @@

import warnings
import numpy as np
import numpy.typing as npt
import pandas as pd
from typing import Union, Tuple
from typing import Union, Tuple, TypeVar, Optional, Callable, List, Any, Dict, Set

from cleanlab.typing import DatasetLike, LabelLike
from cleanlab.internal.validation import labels_to_array
from cleanlab.internal.constants import FLOATING_POINT_COMPARISON, TINY_VALUE


def remove_noise_from_class(noise_matrix, class_without_noise) -> np.ndarray:
T = TypeVar("T", bound=npt.NBitBase)


def remove_noise_from_class(
noise_matrix: npt.NDArray["np.floating[T]"], class_without_noise: int
) -> npt.NDArray["np.floating[T]"]:
"""A helper function in the setting of PU learning.
Sets all P(label=class_without_noise|true_label=any_other_class) = 0
in noise_matrix for pulearning setting, where we have
Expand All @@ -50,7 +56,7 @@ class of choosing, denoted by class_without_noise.
K = len(noise_matrix)

cwn = class_without_noise
x = np.copy(noise_matrix)
x = np.copy(noise_matrix) # type: ignore[no-untyped-call]

# Set P( labels = cwn | y != cwn) = 0 (no noise)
x[cwn, [i for i in range(K) if i != cwn]] = 0.0
Expand All @@ -63,7 +69,7 @@ class of choosing, denoted by class_without_noise.
return x


def clip_noise_rates(noise_matrix) -> np.ndarray:
def clip_noise_rates(noise_matrix: npt.NDArray["np.floating[T]"]) -> npt.NDArray["np.floating[T]"]:
"""Clip all noise rates to proper range [0,1), but
do not modify the diagonal terms because they are not
noise rates.
Expand All @@ -78,16 +84,18 @@ def clip_noise_rates(noise_matrix) -> np.ndarray:
Diagonal terms are not noise rates, but are consistency P(label=k|true_label=k)
Assumes columns of noise_matrix sum to 1"""

def clip_noise_rate_range(noise_rate) -> float:
def clip_noise_rate_range(noise_rate: float) -> float:
"""Clip noise rate P(label=k'|true_label=k) or P(true_label=k|label=k')
into proper range [0,1)"""
return min(max(noise_rate, 0.0), 0.9999)

# Vectorize clip_noise_rate_range for efficiency with np.ndarrays.
vectorized_clip = np.vectorize(clip_noise_rate_range)
vectorized_clip: Callable[
[npt.NDArray["np.floating[T]"]], npt.NDArray["np.floating[T]"]
] = np.vectorize(clip_noise_rate_range)

# Preserve because diagonal entries are not noise rates.
diagonal = np.diagonal(noise_matrix)
diagonal: npt.NDArray["np.floating[T]"] = np.diagonal(noise_matrix)

# Clip all noise rates (efficiently).
noise_matrix = vectorized_clip(noise_matrix)
Expand All @@ -100,7 +108,12 @@ def clip_noise_rate_range(noise_rate) -> float:
return noise_matrix


def clip_values(x, low=0.0, high=1.0, new_sum=None) -> np.ndarray:
def clip_values(
x: npt.NDArray["np.floating[T]"],
low: float = 0.0,
high: float = 1.0,
new_sum: Optional[float] = None,
) -> npt.NDArray["np.floating[T]"]:
"""Clip all values in p to range [low,high].
Preserves sum of x.

Expand All @@ -123,11 +136,13 @@ def clip_values(x, low=0.0, high=1.0, new_sum=None) -> np.ndarray:
x : np.ndarray
A list of clipped values, summing to the same sum as x."""

def clip_range(a, low=low, high=high):
def clip_range(a: float, low: float = low, high: float = high) -> float:
"""Clip a into range [low,high]"""
return min(max(a, low), high)

vectorized_clip = np.vectorize(
vectorized_clip: Callable[
[npt.NDArray["np.floating[T]"]], npt.NDArray["np.floating[T]"]
] = np.vectorize(
clip_range
) # Vectorize clip_range for efficiency with np.ndarrays
prev_sum = sum(x) if new_sum is None else new_sum # Store previous sum
Expand All @@ -138,7 +153,12 @@ def clip_range(a, low=low, high=high):
return x


def value_counts(x, *, num_classes=None, multi_label=False) -> np.ndarray:
def value_counts(
x: Union[List[Any], npt.NDArray[Union[np.int_, np.str_]]],
*,
num_classes: Optional[int] = None,
multi_label: bool = False,
) -> npt.NDArray[np.int_]:
"""Returns an np.ndarray of shape (K, 1), with the
value counts for every unique item in the labels list/array,
where K is the number of unique entries in labels.
Expand Down Expand Up @@ -180,13 +200,18 @@ def value_counts(x, *, num_classes=None, multi_label=False) -> np.ndarray:
raise ValueError(f"Required: num_classes > max(x), but {num_classes} <= {max(x)}.")
# Add zero counts for all missing classes in [0, 1,..., num_classes-1]
# multi_label=False regardless because x was flattened.
missing_classes = get_missing_classes(x, num_classes=num_classes, multi_label=False)
missing_counts = [(z, 0) for z in missing_classes]
missing_classes: List[int] = get_missing_classes(x, num_classes=num_classes, multi_label=False)
missing_counts: List[Tuple[int, int]] = [(z, 0) for z in missing_classes]
# Return counts with zeros for all missing classes.
return np.array(list(zip(*sorted(list(zip(unique_classes, counts)) + missing_counts)))[1])


def value_counts_fill_missing_classes(x, num_classes, *, multi_label=False) -> np.ndarray:
def value_counts_fill_missing_classes(
x: Union[List[Any], npt.NDArray[Union[np.int_, np.str_]]],
num_classes: int,
*,
multi_label: bool = False,
) -> npt.NDArray[np.int_]:
"""Same as ``internal.util.value_counts`` but requires that num_classes is provided and
always fills missing classes with zero counts.

Expand All @@ -195,7 +220,13 @@ def value_counts_fill_missing_classes(x, num_classes, *, multi_label=False) -> n
return value_counts(x, num_classes=num_classes, multi_label=multi_label)


def get_missing_classes(labels, *, pred_probs=None, num_classes=None, multi_label=False):
def get_missing_classes(
labels: LabelLike,
*,
pred_probs: Optional[npt.NDArray["np.floating[T]"]] = None,
num_classes: Optional[int] = None,
multi_label: bool = False,
) -> List[int]:
"""Find which classes are present in ``pred_probs`` but not present in ``labels``.

See ``count.compute_confident_joint`` for parameter docstrings."""
Expand All @@ -209,7 +240,9 @@ def get_missing_classes(labels, *, pred_probs=None, num_classes=None, multi_labe
return sorted(set(range(num_classes)).difference(unique_classes))


def round_preserving_sum(iterable) -> np.ndarray:
def round_preserving_sum(
iterable: Union[List[float], npt.NDArray["np.floating[T]"]]
) -> Union[List[int], npt.NDArray[np.int_]]:
"""Rounds an iterable of floats while retaining the original summed value.
The name of each parameter is required. The type and description of each
parameter is optional, but should be included if not obvious.
Expand Down Expand Up @@ -244,7 +277,9 @@ def round_preserving_sum(iterable) -> np.ndarray:
return ints.astype(int)


def round_preserving_row_totals(confident_joint) -> np.ndarray:
def round_preserving_row_totals(
confident_joint: npt.NDArray["np.floating[T]"],
) -> npt.NDArray[np.int_]:
"""Rounds confident_joint cj to type int
while preserving the totals of reach row.
Assumes that cj is a 2D np.ndarray of type float.
Expand All @@ -266,7 +301,10 @@ def round_preserving_row_totals(confident_joint) -> np.ndarray:
).astype(int)


def estimate_pu_f1(s, prob_s_eq_1) -> float:
def estimate_pu_f1(
s: Union[List[int], npt.NDArray[np.int_]],
prob_s_eq_1: Union[List[float], npt.NDArray["np.floating[T]"]],
) -> Optional[float]:
"""Computes Claesen's estimate of f1 in the pulearning setting.

Parameters
Expand All @@ -289,7 +327,9 @@ def estimate_pu_f1(s, prob_s_eq_1) -> float:
return recall**2 / (2.0 * frac_positive) if frac_positive != 0 else np.nan


def confusion_matrix(true, pred) -> np.ndarray:
def confusion_matrix(
true: npt.NDArray[np.int_], pred: npt.NDArray[np.int_]
) -> npt.NDArray[np.int_]:
"""Implements a confusion matrix for true labels
and predicted labels. true and pred MUST BE the same length
and have the same distinct set of class labels represented.
Expand Down Expand Up @@ -330,13 +370,13 @@ def confusion_matrix(true, pred) -> np.ndarray:


def print_square_matrix(
matrix,
left_name="s",
top_name="y",
title=" A square matrix",
short_title="s,y",
round_places=2,
):
matrix: npt.NDArray["np.floating[T]"],
left_name: str = "s",
top_name: str = "y",
title: str = " A square matrix",
short_title: str = "s,y",
round_places: int = 2,
) -> None:
"""Pretty prints a matrix.

Parameters
Expand Down Expand Up @@ -370,7 +410,7 @@ def print_square_matrix(
print()


def print_noise_matrix(noise_matrix, round_places=2):
def print_noise_matrix(noise_matrix: npt.NDArray["np.floating[T]"], round_places: int = 2) -> None:
"""Pretty prints the noise matrix."""
print_square_matrix(
noise_matrix,
Expand All @@ -380,7 +420,9 @@ def print_noise_matrix(noise_matrix, round_places=2):
)


def print_inverse_noise_matrix(inverse_noise_matrix, round_places=2):
def print_inverse_noise_matrix(
inverse_noise_matrix: npt.NDArray["np.floating[T]"], round_places: int = 2
) -> None:
"""Pretty prints the inverse noise matrix."""
print_square_matrix(
inverse_noise_matrix,
Expand All @@ -392,7 +434,7 @@ def print_inverse_noise_matrix(inverse_noise_matrix, round_places=2):
)


def print_joint_matrix(joint_matrix, round_places=2):
def print_joint_matrix(joint_matrix: npt.NDArray["np.floating[T]"], round_places: int = 2) -> None:
"""Pretty prints the joint label noise matrix."""
print_square_matrix(
joint_matrix,
Expand All @@ -402,7 +444,9 @@ def print_joint_matrix(joint_matrix, round_places=2):
)


def compress_int_array(int_array, num_possible_values) -> np.ndarray:
def compress_int_array(
int_array: Union[Any, npt.NDArray[np.int_]], num_possible_values
) -> Union[Any, npt.NDArray[np.int_]]:
"""Compresses dtype of np.ndarray<int> if num_possible_values is small enough."""
try:
compressed_type = None
Expand Down Expand Up @@ -462,14 +506,18 @@ def train_val_split(
return X_train, X_holdout, labels_train, labels_holdout


def subset_X_y(X, labels, mask) -> Tuple[DatasetLike, LabelLike]:
def subset_X_y(
X, labels: Union[list, np.ndarray, pd.Series], mask: npt.NDArray[np.bool_]
) -> Tuple[DatasetLike, LabelLike]:
"""Extracts subset of features/labels where mask is True"""
labels = subset_labels(labels, mask)
X = subset_data(X, mask)
return X, labels


def subset_labels(labels, mask) -> Union[list, np.ndarray, pd.Series]:
def subset_labels(
labels: Union[list, np.ndarray, pd.Series], mask: npt.NDArray[np.bool_]
) -> Union[list, np.ndarray, pd.Series]:
"""Extracts subset of labels where mask is True"""
try: # filtering labels as if it is array or DataFrame
return labels[mask]
Expand Down Expand Up @@ -680,7 +728,7 @@ def get_num_classes(labels=None, pred_probs=None, label_matrix=None, multi_label
return num_unique_classes(labels, multi_label=multi_label)


def num_unique_classes(labels, multi_label=None) -> int:
def num_unique_classes(labels: LabelLike, multi_label: Optional[bool] = None) -> int:
"""Finds the number of unique classes for both single-labeled
and multi-labeled labels. If multi_label is set to None (default)
this method will infer if multi_label is True or False based on
Expand All @@ -690,7 +738,7 @@ def num_unique_classes(labels, multi_label=None) -> int:
return len(get_unique_classes(labels, multi_label))


def get_unique_classes(labels, multi_label=None) -> set:
def get_unique_classes(labels: LabelLike, multi_label: Optional[bool] = None) -> Set[LabelLike]:
"""Returns the set of unique classes for both single-labeled
and multi-labeled labels. If multi_label is set to None (default)
this method will infer if multi_label is True or False based on
Expand All @@ -705,7 +753,7 @@ def get_unique_classes(labels, multi_label=None) -> set:
return set(labels)


def format_labels(labels: LabelLike) -> Tuple[np.ndarray, dict]:
def format_labels(labels: LabelLike) -> Tuple[np.ndarray, Dict[int, Any]]:
"""Takes an array of labels and formats it such that labels are in the set ``0, 1, ..., K-1``,
where ``K`` is the number of classes. The labels are assigned based on lexicographic order.
This is useful for mapping string class labels to the integer format required by many cleanlab (and sklearn) functions.
Expand All @@ -730,7 +778,7 @@ def format_labels(labels: LabelLike) -> Tuple[np.ndarray, dict]:
return formatted_labels, inverse_map


def smart_display_dataframe(df): # pragma: no cover
def smart_display_dataframe(df: pd.DataFrame) -> None: # pragma: no cover
"""Display a pandas dataframe if in a jupyter notebook, otherwise print it to console."""
try:
from IPython.display import display
Expand Down
Loading