Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Perf] Evaluate API: Support parallelized evaluator batch run through pf.run #3380

Merged
merged 16 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/promptflow-devkit/promptflow/_sdk/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def _prepare_home_dir() -> Path:
PF_TRACE_CONTEXT_ATTR = "attributes"
PF_SERVICE_DEBUG = "PF_SERVICE_DEBUG"
PF_SYSTEM_METRICS_PREFIX = "__pf__"
PF_FLOW_ENTRY_IN_TMP = "PF_FLOW_ENTRY_IN_TMP"

LOCAL_MGMT_DB_PATH = (HOME_PROMPT_FLOW_DIR / "pf.sqlite").resolve()
LOCAL_MGMT_DB_SESSION_ACQUIRE_LOCK_PATH = (HOME_PROMPT_FLOW_DIR / "pf.sqlite.lock").resolve()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
NODE,
NODE_VARIANTS,
NODES,
PF_FLOW_ENTRY_IN_TMP,
PROMPT_FLOW_DIR_NAME,
REFRESH_CONNECTIONS_DIR_LOCK_PATH,
REGISTRY_URI_PREFIX,
Expand Down Expand Up @@ -1019,8 +1020,16 @@ def create_temp_flex_flow_yaml_core(entry: Union[str, PathLike, Callable], code:
logger.warning(f"Found existing {flow_yaml_path.as_posix()}, will not respect it in runtime.")
with open(flow_yaml_path, "r", encoding=DEFAULT_ENCODING) as f:
existing_content = f.read()
if not is_local_module(entry_string=entry, code=code):
logger.debug(f"Entry {entry} is not found in local, it's snapshot will be empty.")

create_yaml_in_tmp = False
if os.environ.get(PF_FLOW_ENTRY_IN_TMP, "False").lower() == "true":
logger.debug("PF_FLOW_ENTRY_IN_TMP is set to true, its snapshot will be empty.")
create_yaml_in_tmp = True
elif not is_local_module(entry_string=entry, code=code):
logger.debug(f"Entry {entry} is not found in local, its snapshot will be empty.")
create_yaml_in_tmp = True

if create_yaml_in_tmp:
# make sure run name is from entry instead of random folder name
temp_dir = tempfile.mkdtemp(prefix=_sanitize_python_variable_name(entry) + "_")
flow_yaml_path = Path(temp_dir) / FLOW_FLEX_YAML
Expand Down
8 changes: 5 additions & 3 deletions src/promptflow-evals/promptflow/evals/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ class EvaluationMetrics:


class Prefixes:
_INPUTS = 'inputs.'
_OUTPUTS = 'outputs.'
_TGT_OUTPUTS = '__outputs.'
_INPUTS = "inputs."
_OUTPUTS = "outputs."
_TGT_OUTPUTS = "__outputs."
ninghu marked this conversation as resolved.
Show resolved Hide resolved


DEFAULT_EVALUATION_RESULTS_FILE_NAME = "evaluation_results.json"

CONTENT_SAFETY_DEFECT_RATE_THRESHOLD_DEFAULT = 4

BATCH_RUN_TIMEOUT = 3600
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
from .batch_run_context import BatchRunContext
ninghu marked this conversation as resolved.
Show resolved Hide resolved
from .code_client import CodeClient
from .proxy_client import ProxyClient

__all__ = ["CodeClient", "ProxyClient", "BatchRunContext"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
import os

from promptflow._sdk._constants import PF_FLOW_ENTRY_IN_TMP
from promptflow._utils.user_agent_utils import ClientUserAgentUtil
from promptflow.tracing._integrations._openai_injector import inject_openai_api, recover_openai_api

from ..._user_agent import USER_AGENT
from .code_client import CodeClient
from .proxy_client import ProxyClient


class BatchRunContext:
ninghu marked this conversation as resolved.
Show resolved Hide resolved
def __init__(self, client):
self.client = client

def __enter__(self):
if isinstance(self.client, CodeClient):
ClientUserAgentUtil.append_user_agent(USER_AGENT)
inject_openai_api()

if isinstance(self.client, ProxyClient):
os.environ[PF_FLOW_ENTRY_IN_TMP] = "true"

def __exit__(self, exc_type, exc_val, exc_tb):
if isinstance(self.client, CodeClient):
recover_openai_api()

if isinstance(self.client, ProxyClient):
os.environ.pop(PF_FLOW_ENTRY_IN_TMP, None)
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,15 @@

import pandas as pd

from promptflow._utils.user_agent_utils import ClientUserAgentUtil
from promptflow.evals.evaluate._utils import _apply_column_mapping, load_jsonl, _has_aggregator
from promptflow.tracing import ThreadPoolExecutorWithContext as ThreadPoolExecutor
from promptflow.tracing._integrations._openai_injector import inject_openai_api, recover_openai_api
from promptflow.contracts.types import AttrDict
from promptflow.evals.evaluate._utils import _apply_column_mapping, _has_aggregator, load_jsonl
from promptflow.tracing import ThreadPoolExecutorWithContext as ThreadPoolExecutor

from ..._user_agent import USER_AGENT
from ..._constants import BATCH_RUN_TIMEOUT

LOGGER = logging.getLogger(__name__)


class BatchRunContext:
def __init__(self, client):
self.client = client

def __enter__(self):
if isinstance(self.client, CodeClient):
ClientUserAgentUtil.append_user_agent(USER_AGENT)
inject_openai_api()

def __exit__(self, exc_type, exc_val, exc_tb):
if isinstance(self.client, CodeClient):
recover_openai_api()


class CodeRun:
def __init__(self, run, input_data, evaluator_name=None, aggregated_metrics=None, **kwargs):
self.run = run
Expand All @@ -40,22 +24,27 @@ def __init__(self, run, input_data, evaluator_name=None, aggregated_metrics=None
self.aggregated_metrics = aggregated_metrics

def get_result_df(self, exclude_inputs=False):
result_df = self.run.result(timeout=60 * 60)
result_df = self.run.result(timeout=BATCH_RUN_TIMEOUT)
if exclude_inputs:
result_df = result_df.drop(columns=[col for col in result_df.columns if col.startswith("inputs.")])
return result_df

def get_aggregated_metrics(self):
try:
aggregated_metrics = self.aggregated_metrics.result(timeout=60 * 60) \
if self.aggregated_metrics is not None else None
aggregated_metrics = (
self.aggregated_metrics.result(timeout=BATCH_RUN_TIMEOUT)
if self.aggregated_metrics is not None
else None
)
except Exception as ex:
LOGGER.debug(f"Error calculating metrics for evaluator {self.evaluator_name}, failed with error {str(ex)}")
aggregated_metrics = None

if not isinstance(aggregated_metrics, dict):
LOGGER.warning(f"Aggregated metrics for evaluator {self.evaluator_name}"
f" is not a dictionary will not be logged as metrics")
LOGGER.warning(
f"Aggregated metrics for evaluator {self.evaluator_name}"
f" is not a dictionary will not be logged as metrics"
)

aggregated_metrics = aggregated_metrics if isinstance(aggregated_metrics, dict) else {}

Expand All @@ -71,8 +60,11 @@ def _calculate_metric(self, evaluator, input_df, column_mapping, evaluator_name)
row_metric_results = []
input_df = _apply_column_mapping(input_df, column_mapping)
# Ignoring args and kwargs from the signature since they are usually catching extra arguments
parameters = {param.name for param in inspect.signature(evaluator).parameters.values()
if param.name not in ['args', 'kwargs']}
parameters = {
param.name
for param in inspect.signature(evaluator).parameters.values()
if param.name not in ["args", "kwargs"]
}
for value in input_df.to_dict("records"):
# Filter out only the parameters that are present in the input data
# if no parameters then pass data as is
Expand All @@ -83,7 +75,7 @@ def _calculate_metric(self, evaluator, input_df, column_mapping, evaluator_name)
try:
result = row_metric_future.result()
if not isinstance(result, dict):
result = {'output': result}
result = {"output": result}
row_metric_results.append(result)
except Exception as ex: # pylint: disable=broad-except
msg_1 = f"Error calculating value for row {row_number} for metric {evaluator_name}, "
Expand Down Expand Up @@ -114,8 +106,9 @@ def _calculate_aggregations(self, evaluator, run):
aggregated_output = aggr_func(aggregate_input)
return aggregated_output
except Exception as ex:
LOGGER.warning(f"Error calculating aggregations for evaluator {run.evaluator_name},"
f" failed with error {str(ex)}")
LOGGER.warning(
f"Error calculating aggregations for evaluator {run.evaluator_name}," f" failed with error {str(ex)}"
)
return None

def run(self, flow, data, evaluator_name=None, column_mapping=None, **kwargs):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
import logging

import numpy as np

from promptflow.client import PFClient
from promptflow.tracing import ThreadPoolExecutorWithContext as ThreadPoolExecutor

from ..._constants import BATCH_RUN_TIMEOUT

LOGGER = logging.getLogger(__name__)


class ProxyRun:
def __init__(self, run, **kwargs):
self.run = run


class ProxyClient:
def __init__(self, pf_client: PFClient):
self._pf_client = pf_client
self._thread_pool = ThreadPoolExecutor(thread_name_prefix="evaluators_thread")

def run(self, flow, data, column_mapping=None, **kwargs):
eval_future = self._thread_pool.submit(
self._pf_client.run, flow, data=data, column_mapping=column_mapping, **kwargs
)
return ProxyRun(run=eval_future)

def get_details(self, proxy_run, all_results=False):
run = proxy_run.run.result(timeout=BATCH_RUN_TIMEOUT)
result_df = self._pf_client.get_details(run, all_results=all_results)
result_df.replace("(Failed)", np.nan, inplace=True)
return result_df

def get_metrics(self, proxy_run):
run = proxy_run.run.result(timeout=BATCH_RUN_TIMEOUT)
return self._pf_client.get_metrics(run)

This file was deleted.

Loading
Loading