From c9c41303ebaa7d3036abe077f14ad21ff97da331 Mon Sep 17 00:00:00 2001 From: Anatoly Volkov Date: Wed, 13 May 2026 10:42:42 -0700 Subject: [PATCH 1/3] Initial commit --- configs/throughput/example.json | 71 ++++ sklbench/benchmarks/throughput_worker.py | 247 +++++++++++++ sklbench/runner/arguments.py | 39 ++ sklbench/runner/implementation.py | 25 +- sklbench/runner/throughput.py | 445 +++++++++++++++++++++++ sklbench/utils/core_assignment.py | 127 +++++++ sklbench/utils/measurement.py | 2 + 7 files changed, 947 insertions(+), 9 deletions(-) create mode 100644 configs/throughput/example.json create mode 100644 sklbench/benchmarks/throughput_worker.py create mode 100644 sklbench/runner/throughput.py create mode 100644 sklbench/utils/core_assignment.py diff --git a/configs/throughput/example.json b/configs/throughput/example.json new file mode 100644 index 00000000..3ce9fd60 --- /dev/null +++ b/configs/throughput/example.json @@ -0,0 +1,71 @@ +{ + "INCLUDE": ["../common/sklearn.json"], + "PARAMETERS_SETS": { + "common parameters": { + "data": { + "split_kwargs": { + "train_size": 8000, + "test_size": 2000, + "shuffle": true, + "random_state": 42 + } + }, + "algorithm": { "device": "default" } + }, + "throughput settings": { + "bench": { + "num_instances": 4, + "cores_per_instance": 4, + "measurement_duration": 30 + } + }, + "datasets": { + "data": [ + { + "source": "make_classification", + "generation_kwargs": { + "n_classes": 2, + "n_samples": 10000, + "n_features": 64, + "n_informative": 32 + } + } + ] + }, + "algorithms": [ + { + "algorithm": { + "estimator": "RandomForestClassifier", + "estimator_params": { "n_estimators": 50 } + } + }, + { + "algorithm": { + "estimator": "KMeans", + "estimator_params": { + "n_clusters": 10, + "init": "random", + "algorithm": "lloyd", + "max_iter": 100 + } + } + }, + { + "algorithm": { + "estimator": "LinearRegression" + } + } + ] + }, + "TEMPLATES": { + "throughput_test": { + "SETS": [ + "sklearn-ex[cpu] implementations", + "common parameters", + "throughput settings", + "datasets", + "algorithms" + ] + } + } +} diff --git a/sklbench/benchmarks/throughput_worker.py b/sklbench/benchmarks/throughput_worker.py new file mode 100644 index 00000000..36167c02 --- /dev/null +++ b/sklbench/benchmarks/throughput_worker.py @@ -0,0 +1,247 @@ +# =============================================================================== +# Copyright 2024 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# =============================================================================== + +import argparse +import inspect +import json +import socket +import sys +import time +from typing import Dict, List, Tuple + +from ..datasets import load_data +from ..datasets.transformer import split_and_transform_data +from ..utils.bench_case import get_bench_case_value +from ..utils.common import convert_to_numpy +from ..utils.config import bench_case_filter +from ..utils.custom_types import BenchCase +from ..utils.logger import logger +from ..utils.special_params import assign_case_special_values_on_run +from .sklearn_estimator import ( + estimator_to_task, + get_estimator, + get_estimator_methods, + get_subset_metrics_of_estimator, + validate_estimator_params, +) + + +def barrier_wait(sock: socket.socket, msg_send: bytes, msg_expect_prefix: bytes): + """Send a message and block until response from parent.""" + sock.sendall(msg_send) + data = b"" + while not data.startswith(msg_expect_prefix): + chunk = sock.recv(1024) + if not chunk: + raise ConnectionError("Barrier socket closed unexpectedly") + data += chunk + + +def run_measurement_loop( + func, args: tuple, measurement_duration: float +) -> Dict[str, List]: + """Run func repeatedly for measurement_duration seconds, recording each iteration.""" + start_timestamps = [] + durations_ms = [] + end_time = time.time() + measurement_duration + while time.time() < end_time: + t0 = time.time() + func(*args) + t1 = time.time() + start_timestamps.append(t0) + durations_ms.append((t1 - t0) * 1000) + return {"start_ts": start_timestamps, "duration_ms": durations_ms} + + +def prepare_estimator(bench_case: BenchCase) -> Tuple: + """Load data, create estimator, return everything needed for measurement.""" + library_name = get_bench_case_value(bench_case, "algorithm:library") + estimator_name = get_bench_case_value(bench_case, "algorithm:estimator") + + estimator_class = get_estimator(library_name, estimator_name) + task = estimator_to_task(estimator_name) + + data, data_description = load_data(bench_case) + (x_train, x_test, y_train, y_test), data_description = split_and_transform_data( + bench_case, data, data_description + ) + + assign_case_special_values_on_run( + bench_case, (x_train, y_train, x_test, y_test), data_description + ) + + estimator_params = get_bench_case_value( + bench_case, "algorithm:estimator_params", dict() + ) + estimator_params = validate_estimator_params(estimator_class, estimator_params) + estimator_methods = get_estimator_methods(bench_case) + + return ( + estimator_class, + estimator_params, + estimator_methods, + task, + x_train, + x_test, + y_train, + y_test, + data_description, + ) + + +def get_method_and_args(estimator_instance, method_name, stage, x_train, x_test, y_train, y_test): + """Get bound method and appropriate data arguments.""" + method_instance = getattr(estimator_instance, method_name) + if "y" in list(inspect.signature(method_instance).parameters): + if stage == "training": + data_args = (x_train, y_train) + else: + data_args = (x_test, y_test) + else: + if stage == "training": + data_args = (x_train,) + else: + data_args = (x_test,) + return method_instance, data_args + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--bench-case", required=True, type=str) + parser.add_argument("--filters", required=True, type=str) + parser.add_argument("--instance-id", required=True, type=int) + parser.add_argument("--barrier-port", required=True, type=int) + parser.add_argument("--measurement-duration", required=True, type=float) + parser.add_argument( + "--log-level", + default="WARNING", + type=str, + choices=("ERROR", "WARNING", "INFO", "DEBUG"), + ) + args = parser.parse_args() + + logger.setLevel(args.log_level) + + bench_case = json.loads(args.bench_case) + filters = json.loads(args.filters)["filters"] + + if not bench_case_filter(bench_case, filters): + logger.warning("Benchmarking case was filtered.") + print(json.dumps({"instance_id": args.instance_id, "filtered": True})) + return + + # --- Preparation phase (unlimited time) --- + ( + estimator_class, + estimator_params, + estimator_methods, + task, + x_train, + x_test, + y_train, + y_test, + data_description, + ) = prepare_estimator(bench_case) + + estimator_instance = estimator_class(**estimator_params) + + # Warmup: run one fit to trigger JIT/allocations + training_methods = estimator_methods.get("training", ["fit"]) + for method_name in training_methods: + if hasattr(estimator_instance, method_name): + method_instance, data_args = get_method_and_args( + estimator_instance, method_name, "training", + x_train, x_test, y_train, y_test + ) + method_instance(*data_args) + break + + # --- Connect to barrier --- + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect(("localhost", args.barrier_port)) + sock.sendall(b"ready") + + # --- Measurement stages --- + stages_results = {} + + for stage in ["training", "inference"]: + methods = estimator_methods.get(stage, []) + available_methods = [m for m in methods if hasattr(estimator_instance, m)] + if not available_methods: + continue + + # Wait for "go" signal from parent before each stage + data = b"" + while b"go" not in data: + chunk = sock.recv(1024) + if not chunk: + raise ConnectionError("Barrier socket closed unexpectedly") + data += chunk + + method_name = available_methods[0] + method_instance, data_args = get_method_and_args( + estimator_instance, method_name, stage, + x_train, x_test, y_train, y_test + ) + + timing_data = run_measurement_loop( + method_instance, data_args, args.measurement_duration + ) + + stages_results[stage] = { + "method": method_name, + "iterations_completed": len(timing_data["start_ts"]), + "start_ts": timing_data["start_ts"], + "duration_ms": timing_data["duration_ms"], + } + + # Signal done to parent + sock.sendall(b"done") + + # --- Compute quality metrics from final fitted model --- + quality_metrics = {} + quality_metrics.update( + get_subset_metrics_of_estimator( + task, "training", estimator_instance, (x_train, y_train) + ) + ) + quality_metrics.update( + get_subset_metrics_of_estimator( + task, "inference", estimator_instance, (x_test, y_test) + ) + ) + + # Get final estimator params + final_params = {} + if hasattr(estimator_instance, "get_params"): + final_params = estimator_instance.get_params() + if "handle" in final_params: + del final_params["handle"] + + sock.close() + + # --- Output --- + output = { + "instance_id": args.instance_id, + "stages": stages_results, + "quality_metrics": quality_metrics, + "estimator_params": final_params, + } + print(json.dumps(output)) + + +if __name__ == "__main__": + main() diff --git a/sklbench/runner/arguments.py b/sklbench/runner/arguments.py index 1ba47daa..63fcdf13 100644 --- a/sklbench/runner/arguments.py +++ b/sklbench/runner/arguments.py @@ -137,6 +137,45 @@ def add_runner_arguments(parser: argparse.ArgumentParser) -> argparse.ArgumentPa action="store_true", help="Interrupt runner and exit if last benchmark failed with error.", ) + # throughput mode arguments + parser.add_argument( + "--throughput-mode", + default=False, + action="store_true", + help="Run in throughput mode: multiple synchronized parallel instances " + "with CPU pinning via numactl.", + ) + parser.add_argument( + "--num-instances", + type=int, + default=None, + help="Number of parallel instances in throughput mode.", + ) + parser.add_argument( + "--cores-per-instance", + type=int, + default=None, + help="CPU cores per instance in throughput mode.", + ) + parser.add_argument( + "--measurement-duration", + type=float, + default=60.0, + help="Duration (seconds) for each measurement stage in throughput mode.", + ) + parser.add_argument( + "--emergency-timeout", + type=float, + default=3600.0, + help="Emergency subprocess timeout (seconds). Safety net only.", + ) + parser.add_argument( + "--throughput-full-logs", + default=False, + action="store_true", + help="Store per-iteration start_ts and duration_ms arrays in throughput results. " + "Disabled by default to reduce output size.", + ) # option to get parser description in Markdown table format for READMEs parser.add_argument( "--describe-parser", diff --git a/sklbench/runner/implementation.py b/sklbench/runner/implementation.py index cac0bba4..522379f5 100644 --- a/sklbench/runner/implementation.py +++ b/sklbench/runner/implementation.py @@ -106,13 +106,20 @@ def run_benchmarks(args: argparse.Namespace) -> int: pool.map(load_data_with_cleanup, dataset_cases.values()) # run bench_cases - return_code, result = call_benchmarks( - bench_cases, - param_filters, - args.bench_log_level, - args.environment_name, - args.exit_on_error, - ) + if args.throughput_mode: + from .throughput import run_throughput_benchmarks + + return_code, result = run_throughput_benchmarks( + bench_cases, param_filters, args + ) + else: + return_code, result = call_benchmarks( + bench_cases, + param_filters, + args.bench_log_level, + args.environment_name, + args.exit_on_error, + ) # output raw result logger.debug(custom_format(result)) @@ -121,8 +128,8 @@ def run_benchmarks(args: argparse.Namespace) -> int: with open(args.result_file, "w") as fp: json.dump(result, fp, indent=4) - # output as pandas dataframe - if len(result["bench_cases"]) != 0: + # output as pandas dataframe (skip for throughput mode which has nested results) + if len(result["bench_cases"]) != 0 and not args.throughput_mode: for key, df in get_result_tables_as_df(result).items(): logger.info(f'{custom_format(key, bcolor="HEADER")}\n{df}') diff --git a/sklbench/runner/throughput.py b/sklbench/runner/throughput.py new file mode 100644 index 00000000..5e8aa4a2 --- /dev/null +++ b/sklbench/runner/throughput.py @@ -0,0 +1,445 @@ +# =============================================================================== +# Copyright 2024 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# =============================================================================== + +import argparse +import json +import socket +import subprocess +import time +from typing import Dict, List, Tuple, Union + +import numpy as np +from tqdm import tqdm + +from ..utils.bench_case import get_bench_case_name, get_bench_case_value +from ..utils.common import custom_format, hash_from_json_repr +from ..utils.core_assignment import compute_core_assignments +from ..utils.custom_types import BenchCase +from ..utils.env import get_environment_info, get_numa_cpus_conf +from ..utils.logger import logger + + +def validate_throughput_args( + num_instances: int, cores_per_instance: int, measurement_duration: float +): + if num_instances is None or num_instances < 1: + raise ValueError( + "--num-instances is required and must be >= 1 in throughput mode" + ) + if cores_per_instance is None or cores_per_instance < 1: + raise ValueError( + "--cores-per-instance is required and must be >= 1 in throughput mode" + ) + if measurement_duration <= 0: + raise ValueError("--measurement-duration must be > 0") + + +def create_barrier_server() -> Tuple[socket.socket, int]: + """Create a TCP server socket on localhost with OS-assigned port.""" + server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + server.bind(("localhost", 0)) + server.listen(128) + port = server.getsockname()[1] + return server, port + + +def wait_for_workers_ready( + server: socket.socket, num_instances: int, timeout: float +) -> List[socket.socket]: + """Accept connections from all workers and wait for 'ready' message.""" + server.settimeout(timeout) + connections = [] + for _ in range(num_instances): + conn, _ = server.accept() + data = b"" + while b"ready" not in data: + chunk = conn.recv(1024) + if not chunk: + raise ConnectionError("Worker disconnected before sending 'ready'") + data += chunk + connections.append(conn) + return connections + + +def send_go_to_all(connections: List[socket.socket]): + """Send 'go' signal to all workers.""" + for conn in connections: + conn.sendall(b"go") + + +def wait_for_workers_done(connections: List[socket.socket], timeout: float): + """Wait for 'done' message from all workers.""" + for conn in connections: + conn.settimeout(timeout) + data = b"" + while b"done" not in data: + chunk = conn.recv(1024) + if not chunk: + raise ConnectionError("Worker disconnected before sending 'done'") + data += chunk + + +def validate_sync_quality(instance_outputs: List[Dict], stage: str): + """Check if all instances started measurement within 100ms of each other.""" + start_times = [] + for output in instance_outputs: + if stage in output.get("stages", {}): + timestamps = output["stages"][stage].get("start_ts", []) + if timestamps: + start_times.append(timestamps[0]) + + if len(start_times) >= 2: + spread_ms = (max(start_times) - min(start_times)) * 1000 + if spread_ms > 100: + logger.warning( + f"Sync quality warning for '{stage}' stage: " + f"start time spread across instances is {spread_ms:.1f}ms (>100ms)" + ) + + +def compute_instance_stats(durations: List[float], start_timestamps: List[float]) -> Dict: + """Compute summary statistics for a single instance's measurement.""" + arr = np.array(durations) + first_start = start_timestamps[0] if start_timestamps else 0.0 + last_end_ts = start_timestamps[-1] + durations[-1] / 1000 if durations else 0.0 + total_actual_time_sec = last_end_ts - first_start if start_timestamps else 0.0 + + return { + "mean_duration_ms": float(np.mean(arr)), + "std_duration_ms": float(np.std(arr)), + "median_duration_ms": float(np.median(arr)), + "p01_duration_ms": float(np.percentile(arr, 1)), + "p25_duration_ms": float(np.percentile(arr, 25)), + "p75_duration_ms": float(np.percentile(arr, 75)), + "p99_duration_ms": float(np.percentile(arr, 99)), + "first_iteration_ms": float(arr[0]), + "total_actual_time_sec": float(total_actual_time_sec), + "first_start_ts": float(first_start), + } + + +def aggregate_stage_results( + instance_outputs: List[Dict], + stage: str, + measurement_duration: float, + core_assignments: List[str], + full_logs: bool = False, +) -> Dict: + """Aggregate per-instance results into a single stage result entry.""" + instances = [] + all_iterations = [] + + for output in instance_outputs: + if output.get("filtered"): + continue + stage_data = output.get("stages", {}).get(stage) + if stage_data is None: + continue + + iters = stage_data["iterations_completed"] + durations = stage_data["duration_ms"] + start_timestamps = stage_data["start_ts"] + throughput = iters / measurement_duration if measurement_duration > 0 else 0.0 + + instance_entry = { + "instance_id": output["instance_id"], + "taskset": core_assignments[output["instance_id"]], + "iterations_completed": iters, + "throughput_iterations_per_sec": float(throughput), + } + instance_entry.update(compute_instance_stats(durations, start_timestamps)) + + if full_logs: + instance_entry["start_ts"] = start_timestamps + instance_entry["duration_ms"] = durations + + instances.append(instance_entry) + all_iterations.append(iters) + + if not instances: + return {} + + throughputs = [inst["throughput_iterations_per_sec"] for inst in instances] + + aggregate = { + "total_iterations": int(sum(all_iterations)), + "total_throughput_iterations_per_sec": float(sum(throughputs)), + "mean_throughput_per_instance": float(np.mean(throughputs)), + "std_throughput_per_instance": float(np.std(throughputs)), + "min_iterations_per_instance": int(min(all_iterations)), + "max_iterations_per_instance": int(max(all_iterations)), + "measurement_wall_time_sec": measurement_duration, + } + + return {"instances": instances, "aggregate": aggregate} + + +def run_single_throughput_case( + bench_case: BenchCase, + filters: List[BenchCase], + num_instances: int, + cores_per_instance: int, + measurement_duration: float, + emergency_timeout: float, + log_level: str, + full_logs: bool = False, +) -> Tuple[int, List[Dict]]: + """Run a single benchmark case in throughput mode.""" + numa_conf = get_numa_cpus_conf() + core_assignments = compute_core_assignments( + num_instances, cores_per_instance, numa_conf if numa_conf else None + ) + + logger.info( + f"Core assignments for {num_instances} instances: {core_assignments}" + ) + + server, port = create_barrier_server() + logger.debug(f"Barrier server listening on localhost:{port}") + + bench_case_str = json.dumps(bench_case).replace(" ", "") + filters_str = json.dumps({"filters": filters}).replace(" ", "") + + processes = [] + for i in range(num_instances): + cmd = ( + f"numactl --physcpubind={core_assignments[i]} --localalloc " + f"python -m sklbench.benchmarks.throughput_worker " + f"--bench-case {bench_case_str} " + f"--filters {filters_str} " + f"--log-level {log_level} " + f"--instance-id {i} " + f"--barrier-port {port} " + f"--measurement-duration {measurement_duration}" + ) + logger.debug(f"Launching instance {i}: {cmd}") + proc = subprocess.Popen( + cmd.split(" "), + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + encoding="utf-8", + ) + processes.append(proc) + + try: + # Wait for all workers to be ready (prep phase - unlimited, but bounded by emergency timeout) + connections = wait_for_workers_ready(server, num_instances, emergency_timeout) + logger.info("All workers ready, starting measurement stages") + + # Determine which stages exist + estimator_methods_training = get_bench_case_value( + bench_case, "algorithm:estimator_methods:training", None + ) + estimator_methods_inference = get_bench_case_value( + bench_case, "algorithm:estimator_methods:inference", None + ) + stages = [] + if estimator_methods_training is not None: + stages = ["training", "inference"] + else: + # default stages + stages = ["training", "inference"] + + stage_timeout = measurement_duration + 60 # extra time for one stage + + for stage in stages: + logger.info(f"Sending 'go' for {stage} stage") + send_go_to_all(connections) + wait_for_workers_done(connections, stage_timeout) + logger.info(f"All workers done with {stage} stage") + + # Close barrier connections + for conn in connections: + conn.close() + + except Exception as e: + logger.error(f"Barrier synchronization failed: {e}") + for proc in processes: + proc.kill() + server.close() + return -1, [] + + server.close() + + # Collect outputs + instance_outputs = [] + return_code = 0 + for i, proc in enumerate(processes): + try: + stdout, stderr = proc.communicate(timeout=30) + except subprocess.TimeoutExpired: + proc.kill() + stdout, stderr = proc.communicate() + + if proc.returncode != 0: + logger.warning( + f"Instance {i} returned non-zero code={proc.returncode}.\n" + f"stderr: {stderr}" + ) + return_code = proc.returncode + continue + + if stderr: + logger.debug(f"Instance {i} stderr: {stderr}") + + try: + output = json.loads(stdout) + instance_outputs.append(output) + except json.JSONDecodeError: + logger.warning(f"Instance {i}: unable to parse stdout as JSON") + return_code = -1 + + if not instance_outputs: + return return_code, [] + + # Validate sync quality + for stage in stages: + validate_sync_quality(instance_outputs, stage) + + # Build result entries (one per stage) + results = [] + estimator_name = get_bench_case_value(bench_case, "algorithm:estimator") + library_name = get_bench_case_value(bench_case, "algorithm:library") + from .commands_helper import generate_benchmark_command + + from ..benchmarks.sklearn_estimator import estimator_to_task + + task = estimator_to_task(estimator_name) + + # Get quality metrics from first instance (all should be similar) + quality_metrics = instance_outputs[0].get("quality_metrics", {}) + final_estimator_params = instance_outputs[0].get("estimator_params", {}) + + for stage in stages: + stage_result = aggregate_stage_results( + instance_outputs, stage, measurement_duration, core_assignments, full_logs + ) + if not stage_result: + continue + + # Find the method name from first instance + method = "unknown" + for output in instance_outputs: + stage_data = output.get("stages", {}).get(stage) + if stage_data: + method = stage_data.get("method", "unknown") + break + + result_entry = { + "mode": "throughput", + "stage": stage, + "method": method, + "task": task, + "estimator": estimator_name, + "library": library_name, + "device": get_bench_case_value(bench_case, "algorithm:device"), + "num_instances": num_instances, + "cores_per_instance": cores_per_instance, + "measurement_duration_seconds": measurement_duration, + } + result_entry.update(quality_metrics) + result_entry.update(final_estimator_params) + result_entry.update(stage_result) + results.append(result_entry) + + return return_code, results + + +def run_throughput_benchmarks( + bench_cases: List[BenchCase], + filters: List[BenchCase], + args: argparse.Namespace, +) -> Tuple[int, Dict[str, Union[Dict, List]]]: + """Main entry point for throughput mode.""" + env_info = get_environment_info() + environment_name = args.environment_name or hash_from_json_repr(env_info) + + # Resolve global defaults from CLI + default_num_instances = args.num_instances + default_cores_per_instance = args.cores_per_instance + default_measurement_duration = args.measurement_duration + default_emergency_timeout = args.emergency_timeout + full_logs = args.throughput_full_logs + + results = [] + return_code = 0 + + bench_cases_with_pbar = tqdm(bench_cases) + for bench_case in bench_cases_with_pbar: + bench_cases_with_pbar.set_description( + custom_format( + get_bench_case_name(bench_case, shortened=True), bcolor="HEADER" + ) + ) + + # Per-case config overrides CLI defaults + num_instances = get_bench_case_value( + bench_case, "bench:num_instances", default_num_instances + ) + cores_per_instance = get_bench_case_value( + bench_case, "bench:cores_per_instance", default_cores_per_instance + ) + measurement_duration = get_bench_case_value( + bench_case, "bench:measurement_duration", default_measurement_duration + ) + emergency_timeout = get_bench_case_value( + bench_case, "bench:emergency_timeout", default_emergency_timeout + ) + + try: + validate_throughput_args( + num_instances, cores_per_instance, measurement_duration + ) + except ValueError as e: + logger.error(f"Invalid throughput parameters: {e}") + return_code = -1 + if args.exit_on_error: + break + continue + + try: + case_return_code, case_results = run_single_throughput_case( + bench_case, + filters, + num_instances, + cores_per_instance, + measurement_duration, + emergency_timeout, + args.bench_log_level, + full_logs, + ) + if case_return_code != 0: + return_code = case_return_code + if args.exit_on_error: + break + for entry in case_results: + entry["environment_name"] = environment_name + results.append(entry) + except KeyboardInterrupt: + return_code = -1 + break + except Exception as e: + logger.error(f"Throughput case failed: {e}") + return_code = -1 + if args.exit_on_error: + break + + full_result = { + "bench_cases": results, + "environment": {environment_name: env_info}, + } + return return_code, full_result diff --git a/sklbench/utils/core_assignment.py b/sklbench/utils/core_assignment.py new file mode 100644 index 00000000..b5fab549 --- /dev/null +++ b/sklbench/utils/core_assignment.py @@ -0,0 +1,127 @@ +# =============================================================================== +# Copyright 2024 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# =============================================================================== + +from typing import Dict, List, Optional + +from .logger import logger + + +def parse_cpu_range(range_str: str) -> List[int]: + """Parse '0-11,24-35' into sorted list of individual core IDs.""" + cores = [] + for part in range_str.split(","): + part = part.strip() + if "-" in part: + start, end = part.split("-") + cores.extend(range(int(start), int(end) + 1)) + else: + cores.append(int(part)) + return sorted(cores) + + +def cores_to_range_str(cores: List[int]) -> str: + """Convert [0,1,2,3,8,9,10,11] to '0-3,8-11'.""" + if not cores: + return "" + cores = sorted(cores) + ranges = [] + start = cores[0] + end = cores[0] + for c in cores[1:]: + if c == end + 1: + end = c + else: + ranges.append(f"{start}-{end}" if start != end else str(start)) + start = c + end = c + ranges.append(f"{start}-{end}" if start != end else str(start)) + return ",".join(ranges) + + +def compute_core_assignments( + num_instances: int, + cores_per_instance: int, + numa_cpus_conf: Optional[Dict[int, str]] = None, +) -> List[str]: + """ + Returns list of physcpubind strings for numactl, one per instance. + + NUMA-aware: keeps each instance within a single NUMA node when possible. + Fallback: sequential blocks from core 0. + Raises ValueError if insufficient cores. + """ + total_needed = num_instances * cores_per_instance + + if numa_cpus_conf: + numa_cores = { + node: parse_cpu_range(cpu_str) + for node, cpu_str in numa_cpus_conf.items() + } + total_available = sum(len(c) for c in numa_cores.values()) + if total_needed > total_available: + raise ValueError( + f"Need {total_needed} cores ({num_instances} instances x " + f"{cores_per_instance} cores) but only {total_available} available" + ) + + assignments = [] + remaining = {node: list(cores) for node, cores in numa_cores.items()} + + for _ in range(num_instances): + assigned = False + for node in sorted(remaining.keys()): + if len(remaining[node]) >= cores_per_instance: + instance_cores = remaining[node][:cores_per_instance] + remaining[node] = remaining[node][cores_per_instance:] + assignments.append(cores_to_range_str(instance_cores)) + assigned = True + break + if not assigned: + # couldn't fit in a single node, take from multiple nodes + instance_cores = [] + for node in sorted(remaining.keys()): + take = min( + len(remaining[node]), + cores_per_instance - len(instance_cores), + ) + instance_cores.extend(remaining[node][:take]) + remaining[node] = remaining[node][take:] + if len(instance_cores) == cores_per_instance: + break + if len(instance_cores) < cores_per_instance: + raise ValueError("Insufficient cores for assignment") + logger.warning( + f"Instance assigned cores across NUMA nodes: " + f"{cores_to_range_str(instance_cores)}" + ) + assignments.append(cores_to_range_str(instance_cores)) + + return assignments + else: + from psutil import cpu_count + + available = cpu_count(logical=True) + if total_needed > available: + raise ValueError( + f"Need {total_needed} cores ({num_instances} instances x " + f"{cores_per_instance} cores) but only {available} available" + ) + assignments = [] + for i in range(num_instances): + start = i * cores_per_instance + end = start + cores_per_instance - 1 + assignments.append(f"{start}-{end}") + return assignments diff --git a/sklbench/utils/measurement.py b/sklbench/utils/measurement.py index a80da7fc..6e6d5eb6 100644 --- a/sklbench/utils/measurement.py +++ b/sklbench/utils/measurement.py @@ -65,6 +65,8 @@ def enrich_metrics( """Transforms raw performance and other results into aggregated metrics""" # time metrics res = bench_result.copy() + if "time[ms]" not in res: + return res if isinstance(res["time[ms]"], list): mean, std = box_filter(res["time[ms]"]) if include_performance_stability_metrics: From b42d09de34b79b560bbd67b8df22b0d905c026ad Mon Sep 17 00:00:00 2001 From: Anatoly Volkov Date: Wed, 20 May 2026 06:03:06 -0700 Subject: [PATCH 2/3] Update throughput arguments and core assignment --- sklbench/benchmarks/throughput_worker.py | 22 +--- sklbench/runner/arguments.py | 36 +------ sklbench/runner/throughput.py | 122 +++++------------------ sklbench/utils/barrier.py | 68 +++++++++++++ sklbench/utils/core_assignment.py | 110 ++++++++++---------- 5 files changed, 152 insertions(+), 206 deletions(-) create mode 100644 sklbench/utils/barrier.py diff --git a/sklbench/benchmarks/throughput_worker.py b/sklbench/benchmarks/throughput_worker.py index 36167c02..7acd6e37 100644 --- a/sklbench/benchmarks/throughput_worker.py +++ b/sklbench/benchmarks/throughput_worker.py @@ -1,5 +1,5 @@ # =============================================================================== -# Copyright 2024 Intel Corporation +# Copyright 2026 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,14 +18,13 @@ import inspect import json import socket -import sys import time from typing import Dict, List, Tuple from ..datasets import load_data from ..datasets.transformer import split_and_transform_data +from ..utils.barrier import recv_until from ..utils.bench_case import get_bench_case_value -from ..utils.common import convert_to_numpy from ..utils.config import bench_case_filter from ..utils.custom_types import BenchCase from ..utils.logger import logger @@ -39,16 +38,6 @@ ) -def barrier_wait(sock: socket.socket, msg_send: bytes, msg_expect_prefix: bytes): - """Send a message and block until response from parent.""" - sock.sendall(msg_send) - data = b"" - while not data.startswith(msg_expect_prefix): - chunk = sock.recv(1024) - if not chunk: - raise ConnectionError("Barrier socket closed unexpectedly") - data += chunk - def run_measurement_loop( func, args: tuple, measurement_duration: float @@ -184,12 +173,7 @@ def main(): continue # Wait for "go" signal from parent before each stage - data = b"" - while b"go" not in data: - chunk = sock.recv(1024) - if not chunk: - raise ConnectionError("Barrier socket closed unexpectedly") - data += chunk + recv_until(sock, b"go") method_name = available_methods[0] method_instance, data_args = get_method_and_args( diff --git a/sklbench/runner/arguments.py b/sklbench/runner/arguments.py index 63fcdf13..0a7c65fe 100644 --- a/sklbench/runner/arguments.py +++ b/sklbench/runner/arguments.py @@ -137,44 +137,14 @@ def add_runner_arguments(parser: argparse.ArgumentParser) -> argparse.ArgumentPa action="store_true", help="Interrupt runner and exit if last benchmark failed with error.", ) - # throughput mode arguments + # throughput mode parser.add_argument( "--throughput-mode", default=False, action="store_true", help="Run in throughput mode: multiple synchronized parallel instances " - "with CPU pinning via numactl.", - ) - parser.add_argument( - "--num-instances", - type=int, - default=None, - help="Number of parallel instances in throughput mode.", - ) - parser.add_argument( - "--cores-per-instance", - type=int, - default=None, - help="CPU cores per instance in throughput mode.", - ) - parser.add_argument( - "--measurement-duration", - type=float, - default=60.0, - help="Duration (seconds) for each measurement stage in throughput mode.", - ) - parser.add_argument( - "--emergency-timeout", - type=float, - default=3600.0, - help="Emergency subprocess timeout (seconds). Safety net only.", - ) - parser.add_argument( - "--throughput-full-logs", - default=False, - action="store_true", - help="Store per-iteration start_ts and duration_ms arrays in throughput results. " - "Disabled by default to reduce output size.", + "with CPU pinning via numactl. Configure via bench:num_instances, " + "bench:cores_per_instance, bench:measurement_duration in config.", ) # option to get parser description in Markdown table format for READMEs parser.add_argument( diff --git a/sklbench/runner/throughput.py b/sklbench/runner/throughput.py index 5e8aa4a2..830bdde1 100644 --- a/sklbench/runner/throughput.py +++ b/sklbench/runner/throughput.py @@ -1,5 +1,5 @@ # =============================================================================== -# Copyright 2024 Intel Corporation +# Copyright 2026 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,7 +16,6 @@ import argparse import json -import socket import subprocess import time from typing import Dict, List, Tuple, Union @@ -24,6 +23,7 @@ import numpy as np from tqdm import tqdm +from ..utils.barrier import accept_and_wait, create_server, send_all, wait_all from ..utils.bench_case import get_bench_case_name, get_bench_case_value from ..utils.common import custom_format, hash_from_json_repr from ..utils.core_assignment import compute_core_assignments @@ -37,60 +37,14 @@ def validate_throughput_args( ): if num_instances is None or num_instances < 1: raise ValueError( - "--num-instances is required and must be >= 1 in throughput mode" + "bench:num_instances is required and must be >= 1 in throughput mode" ) if cores_per_instance is None or cores_per_instance < 1: raise ValueError( - "--cores-per-instance is required and must be >= 1 in throughput mode" + "bench:cores_per_instance is required and must be >= 1 in throughput mode" ) if measurement_duration <= 0: - raise ValueError("--measurement-duration must be > 0") - - -def create_barrier_server() -> Tuple[socket.socket, int]: - """Create a TCP server socket on localhost with OS-assigned port.""" - server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - server.bind(("localhost", 0)) - server.listen(128) - port = server.getsockname()[1] - return server, port - - -def wait_for_workers_ready( - server: socket.socket, num_instances: int, timeout: float -) -> List[socket.socket]: - """Accept connections from all workers and wait for 'ready' message.""" - server.settimeout(timeout) - connections = [] - for _ in range(num_instances): - conn, _ = server.accept() - data = b"" - while b"ready" not in data: - chunk = conn.recv(1024) - if not chunk: - raise ConnectionError("Worker disconnected before sending 'ready'") - data += chunk - connections.append(conn) - return connections - - -def send_go_to_all(connections: List[socket.socket]): - """Send 'go' signal to all workers.""" - for conn in connections: - conn.sendall(b"go") - - -def wait_for_workers_done(connections: List[socket.socket], timeout: float): - """Wait for 'done' message from all workers.""" - for conn in connections: - conn.settimeout(timeout) - data = b"" - while b"done" not in data: - chunk = conn.recv(1024) - if not chunk: - raise ConnectionError("Worker disconnected before sending 'done'") - data += chunk + raise ValueError("bench:measurement_duration must be > 0") def validate_sync_quality(instance_outputs: List[Dict], stage: str): @@ -137,7 +91,6 @@ def aggregate_stage_results( stage: str, measurement_duration: float, core_assignments: List[str], - full_logs: bool = False, ) -> Dict: """Aggregate per-instance results into a single stage result entry.""" instances = [] @@ -163,10 +116,6 @@ def aggregate_stage_results( } instance_entry.update(compute_instance_stats(durations, start_timestamps)) - if full_logs: - instance_entry["start_ts"] = start_timestamps - instance_entry["duration_ms"] = durations - instances.append(instance_entry) all_iterations.append(iters) @@ -196,19 +145,25 @@ def run_single_throughput_case( measurement_duration: float, emergency_timeout: float, log_level: str, - full_logs: bool = False, ) -> Tuple[int, List[Dict]]: """Run a single benchmark case in throughput mode.""" + # Preload dataset in parent process to avoid cache race condition + # when multiple workers try to download/generate and save simultaneously + from ..datasets import load_data + + logger.info("Preloading dataset in parent process to populate cache") + load_data(bench_case) + numa_conf = get_numa_cpus_conf() core_assignments = compute_core_assignments( - num_instances, cores_per_instance, numa_conf if numa_conf else None + num_instances, cores_per_instance, numa_conf or None ) logger.info( f"Core assignments for {num_instances} instances: {core_assignments}" ) - server, port = create_barrier_server() + server, port = create_server() logger.debug(f"Barrier server listening on localhost:{port}") bench_case_str = json.dumps(bench_case).replace(" ", "") @@ -236,33 +191,18 @@ def run_single_throughput_case( processes.append(proc) try: - # Wait for all workers to be ready (prep phase - unlimited, but bounded by emergency timeout) - connections = wait_for_workers_ready(server, num_instances, emergency_timeout) + connections = accept_and_wait(server, num_instances, b"ready", emergency_timeout) logger.info("All workers ready, starting measurement stages") - # Determine which stages exist - estimator_methods_training = get_bench_case_value( - bench_case, "algorithm:estimator_methods:training", None - ) - estimator_methods_inference = get_bench_case_value( - bench_case, "algorithm:estimator_methods:inference", None - ) - stages = [] - if estimator_methods_training is not None: - stages = ["training", "inference"] - else: - # default stages - stages = ["training", "inference"] - - stage_timeout = measurement_duration + 60 # extra time for one stage + stages = ["training", "inference"] + stage_timeout = measurement_duration + 60 for stage in stages: logger.info(f"Sending 'go' for {stage} stage") - send_go_to_all(connections) - wait_for_workers_done(connections, stage_timeout) + send_all(connections, b"go") + wait_all(connections, b"done", stage_timeout) logger.info(f"All workers done with {stage} stage") - # Close barrier connections for conn in connections: conn.close() @@ -326,7 +266,7 @@ def run_single_throughput_case( for stage in stages: stage_result = aggregate_stage_results( - instance_outputs, stage, measurement_duration, core_assignments, full_logs + instance_outputs, stage, measurement_duration, core_assignments ) if not stage_result: continue @@ -368,13 +308,6 @@ def run_throughput_benchmarks( env_info = get_environment_info() environment_name = args.environment_name or hash_from_json_repr(env_info) - # Resolve global defaults from CLI - default_num_instances = args.num_instances - default_cores_per_instance = args.cores_per_instance - default_measurement_duration = args.measurement_duration - default_emergency_timeout = args.emergency_timeout - full_logs = args.throughput_full_logs - results = [] return_code = 0 @@ -386,18 +319,14 @@ def run_throughput_benchmarks( ) ) - # Per-case config overrides CLI defaults - num_instances = get_bench_case_value( - bench_case, "bench:num_instances", default_num_instances - ) - cores_per_instance = get_bench_case_value( - bench_case, "bench:cores_per_instance", default_cores_per_instance - ) + # All throughput parameters come from bench_case config + num_instances = get_bench_case_value(bench_case, "bench:num_instances") + cores_per_instance = get_bench_case_value(bench_case, "bench:cores_per_instance") measurement_duration = get_bench_case_value( - bench_case, "bench:measurement_duration", default_measurement_duration + bench_case, "bench:measurement_duration", 60.0 ) emergency_timeout = get_bench_case_value( - bench_case, "bench:emergency_timeout", default_emergency_timeout + bench_case, "bench:emergency_timeout", 3600.0 ) try: @@ -420,7 +349,6 @@ def run_throughput_benchmarks( measurement_duration, emergency_timeout, args.bench_log_level, - full_logs, ) if case_return_code != 0: return_code = case_return_code diff --git a/sklbench/utils/barrier.py b/sklbench/utils/barrier.py new file mode 100644 index 00000000..c34e3254 --- /dev/null +++ b/sklbench/utils/barrier.py @@ -0,0 +1,68 @@ +# =============================================================================== +# Copyright 2026 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# =============================================================================== + +"""TCP socket barrier for synchronizing throughput mode worker processes.""" + +import socket +from typing import List, Tuple + + +def create_server() -> Tuple[socket.socket, int]: + """Create a TCP server socket on localhost with OS-assigned port.""" + server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + server.bind(("localhost", 0)) + server.listen(128) + port = server.getsockname()[1] + return server, port + + +def recv_until(sock: socket.socket, expected: bytes): + """Block until expected message is received on socket.""" + data = b"" + while expected not in data: + chunk = sock.recv(1024) + if not chunk: + raise ConnectionError( + f"Socket closed before receiving {expected!r}" + ) + data += chunk + + +def send_all(connections: List[socket.socket], message: bytes): + """Send message to all connections.""" + for conn in connections: + conn.sendall(message) + + +def accept_and_wait( + server: socket.socket, num_connections: int, expected: bytes, timeout: float +) -> List[socket.socket]: + """Accept num_connections and wait for expected message from each.""" + server.settimeout(timeout) + connections = [] + for _ in range(num_connections): + conn, _ = server.accept() + recv_until(conn, expected) + connections.append(conn) + return connections + + +def wait_all(connections: List[socket.socket], expected: bytes, timeout: float): + """Wait for expected message from all existing connections.""" + for conn in connections: + conn.settimeout(timeout) + recv_until(conn, expected) diff --git a/sklbench/utils/core_assignment.py b/sklbench/utils/core_assignment.py index b5fab549..324d2dd8 100644 --- a/sklbench/utils/core_assignment.py +++ b/sklbench/utils/core_assignment.py @@ -1,5 +1,5 @@ # =============================================================================== -# Copyright 2024 Intel Corporation +# Copyright 2026 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,6 +14,7 @@ # limitations under the License. # =============================================================================== +import os from typing import Dict, List, Optional from .logger import logger @@ -51,6 +52,22 @@ def cores_to_range_str(cores: List[int]) -> str: return ",".join(ranges) +def is_consecutive(cores: List[int]) -> bool: + """Check if a sorted list of core IDs is consecutive.""" + for i in range(1, len(cores)): + if cores[i] != cores[i - 1] + 1: + return False + return True + + +def get_numa_node_for_core(core_id: int, numa_cpus_conf: Dict[int, str]) -> int: + """Return NUMA node ID for a given core, or -1 if unknown.""" + for node, cpu_str in numa_cpus_conf.items(): + if core_id in parse_cpu_range(cpu_str): + return node + return -1 + + def compute_core_assignments( num_instances: int, cores_per_instance: int, @@ -59,69 +76,48 @@ def compute_core_assignments( """ Returns list of physcpubind strings for numactl, one per instance. - NUMA-aware: keeps each instance within a single NUMA node when possible. - Fallback: sequential blocks from core 0. - Raises ValueError if insufficient cores. + Uses only CPUs available to the current process (respects parent's taskset). + Splits available cores into sequential groups of cores_per_instance. + Warns if a group is non-consecutive or spans NUMA nodes. + Raises ValueError if the process doesn't have enough cores. """ + available_cores = sorted(os.sched_getaffinity(0)) total_needed = num_instances * cores_per_instance + if total_needed > len(available_cores): + raise ValueError( + f"Need {total_needed} cores ({num_instances} instances x " + f"{cores_per_instance} cores) but only {len(available_cores)} " + f"available to this process" + ) + + # Build NUMA lookup if available + numa_lookup = {} if numa_cpus_conf: - numa_cores = { - node: parse_cpu_range(cpu_str) - for node, cpu_str in numa_cpus_conf.items() - } - total_available = sum(len(c) for c in numa_cores.values()) - if total_needed > total_available: - raise ValueError( - f"Need {total_needed} cores ({num_instances} instances x " - f"{cores_per_instance} cores) but only {total_available} available" + for node, cpu_str in numa_cpus_conf.items(): + for core in parse_cpu_range(cpu_str): + numa_lookup[core] = node + + assignments = [] + for i in range(num_instances): + instance_cores = available_cores[ + i * cores_per_instance : (i + 1) * cores_per_instance + ] + + if not is_consecutive(instance_cores): + logger.warning( + f"Instance {i}: assigned non-consecutive cores " + f"{cores_to_range_str(instance_cores)}" ) - assignments = [] - remaining = {node: list(cores) for node, cores in numa_cores.items()} - - for _ in range(num_instances): - assigned = False - for node in sorted(remaining.keys()): - if len(remaining[node]) >= cores_per_instance: - instance_cores = remaining[node][:cores_per_instance] - remaining[node] = remaining[node][cores_per_instance:] - assignments.append(cores_to_range_str(instance_cores)) - assigned = True - break - if not assigned: - # couldn't fit in a single node, take from multiple nodes - instance_cores = [] - for node in sorted(remaining.keys()): - take = min( - len(remaining[node]), - cores_per_instance - len(instance_cores), - ) - instance_cores.extend(remaining[node][:take]) - remaining[node] = remaining[node][take:] - if len(instance_cores) == cores_per_instance: - break - if len(instance_cores) < cores_per_instance: - raise ValueError("Insufficient cores for assignment") + if numa_lookup: + nodes = set(numa_lookup.get(c, -1) for c in instance_cores) + if len(nodes) > 1: logger.warning( - f"Instance assigned cores across NUMA nodes: " - f"{cores_to_range_str(instance_cores)}" + f"Instance {i}: cores {cores_to_range_str(instance_cores)} " + f"span multiple NUMA nodes {sorted(nodes)}" ) - assignments.append(cores_to_range_str(instance_cores)) - return assignments - else: - from psutil import cpu_count + assignments.append(cores_to_range_str(instance_cores)) - available = cpu_count(logical=True) - if total_needed > available: - raise ValueError( - f"Need {total_needed} cores ({num_instances} instances x " - f"{cores_per_instance} cores) but only {available} available" - ) - assignments = [] - for i in range(num_instances): - start = i * cores_per_instance - end = start + cores_per_instance - 1 - assignments.append(f"{start}-{end}") - return assignments + return assignments From ec85b08a6af600e75d7db2f624e4e1bdc83b5d6c Mon Sep 17 00:00:00 2001 From: Anatoly Volkov Date: Wed, 20 May 2026 07:21:23 -0700 Subject: [PATCH 3/3] Update config --- configs/throughput/example.json | 244 ++++++++++++++++++++++++++------ 1 file changed, 202 insertions(+), 42 deletions(-) diff --git a/configs/throughput/example.json b/configs/throughput/example.json index 3ce9fd60..1499de94 100644 --- a/configs/throughput/example.json +++ b/configs/throughput/example.json @@ -1,70 +1,230 @@ { "INCLUDE": ["../common/sklearn.json"], "PARAMETERS_SETS": { - "common parameters": { - "data": { - "split_kwargs": { - "train_size": 8000, - "test_size": 2000, - "shuffle": true, + "throughput settings": { + "bench": { + "num_instances": 7, + "cores_per_instance": 8, + "measurement_duration": 30 + } + }, + "linear regression": { + "algorithm": { + "estimator": "LinearRegression", + "estimator_params": { + "fit_intercept": true, + "copy_X": true + } + } + }, + "random forest regressor": { + "algorithm": { + "estimator": "RandomForestRegressor", + "estimator_params": { + "n_estimators": 100, + "max_depth": 12, "random_state": 42 } + } + }, + "knn classifier kdtree": { + "algorithm": { + "estimator": "KNeighborsClassifier", + "estimator_params": { + "n_neighbors": 10, + "weights": "uniform", + "algorithm": "kd_tree", + "metric": "minkowski", + "p": 2 + } }, - "algorithm": { "device": "default" } + "data": { + "preprocessing_kwargs": { "normalize": "standard" } + } }, - "throughput settings": { - "bench": { - "num_instances": 4, - "cores_per_instance": 4, - "measurement_duration": 30 + "kmeans": { + "algorithm": { + "estimator": "KMeans", + "estimator_params": { + "n_clusters": 10, + "n_init": 1, + "max_iter": 100, + "tol": 1e-4, + "random_state": 42, + "algorithm": "lloyd" + }, + "estimator_methods": { + "inference": "predict" + } + }, + "data": { + "preprocessing_kwargs": { "normalize": "standard" } } }, - "datasets": { + "linear regression datasets": { "data": [ { - "source": "make_classification", + "source": "make_regression", "generation_kwargs": { - "n_classes": 2, - "n_samples": 10000, - "n_features": 64, - "n_informative": 32 + "n_samples": 500000, + "n_features": 50, + "n_informative": 10, + "noise": 20.0, + "random_state": 42 } + }, + { + "source": "make_regression", + "generation_kwargs": { + "n_samples": 100000, + "n_features": 2000, + "n_informative": 10, + "noise": 60.0, + "random_state": 42 + } + }, + { + "dataset": "year_prediction_msd", + "split_kwargs": { "train_size": 0.8, "test_size": 0.2 } + }, + { + "dataset": "hepmass", + "split_kwargs": { "train_size": 0.8, "test_size": 0.2 } } ] }, - "algorithms": [ - { - "algorithm": { - "estimator": "RandomForestClassifier", - "estimator_params": { "n_estimators": 50 } + "random forest datasets": { + "data": [ + { + "source": "make_regression", + "generation_kwargs": { + "n_samples": 100000, + "n_features": 20, + "n_informative": 10, + "noise": 10.0, + "random_state": 42 + } + }, + { + "source": "make_regression", + "generation_kwargs": { + "n_samples": 50000, + "n_features": 200, + "n_informative": 20, + "noise": 30.0, + "random_state": 42 + } + }, + { + "dataset": "year_prediction_msd", + "split_kwargs": { "train_size": 0.8, "test_size": 0.2 } + }, + { + "dataset": "medical_charges_nominal", + "split_kwargs": { "ignore": true } } - }, - { - "algorithm": { - "estimator": "KMeans", - "estimator_params": { - "n_clusters": 10, - "init": "random", - "algorithm": "lloyd", - "max_iter": 100 + ] + }, + "knn datasets": { + "data": [ + { + "source": "make_classification", + "generation_kwargs": { + "n_classes": 5, + "n_samples": 200000, + "n_features": 10, + "n_informative": 8, + "n_redundant": 2, + "n_repeated": 0, + "random_state": 42 + } + }, + { + "source": "make_classification", + "generation_kwargs": { + "n_classes": 5, + "n_samples": 50000, + "n_features": 50, + "n_informative": 20, + "n_redundant": 10, + "n_repeated": 0, + "random_state": 42 } + }, + { + "dataset": "letters", + "split_kwargs": { "ignore": true } + }, + { + "dataset": "codrnanorm", + "split_kwargs": { "ignore": true } } - }, - { - "algorithm": { - "estimator": "LinearRegression" + ] + }, + "kmeans datasets": { + "data": [ + { + "source": "make_blobs", + "generation_kwargs": { + "centers": 10, + "cluster_std": 4.0, + "n_samples": 500000, + "n_features": 20, + "random_state": 42 + } + }, + { + "source": "make_blobs", + "generation_kwargs": { + "centers": 10, + "cluster_std": 8.0, + "n_samples": 100000, + "n_features": 200, + "random_state": 42 + } + }, + { + "dataset": "creditcard", + "split_kwargs": { "ignore": true } + }, + { + "dataset": "hepmass", + "split_kwargs": { "train_size": 0.8, "test_size": 0.2 } } - } - ] + ] + } }, "TEMPLATES": { - "throughput_test": { + "linear_regression_throughput": { + "SETS": [ + "sklearn-ex[cpu] implementations", + "throughput settings", + "linear regression", + "linear regression datasets" + ] + }, + "random_forest_throughput": { + "SETS": [ + "sklearn-ex[cpu] implementations", + "throughput settings", + "random forest regressor", + "random forest datasets" + ] + }, + "knn_throughput": { + "SETS": [ + "sklearn-ex[cpu] implementations", + "throughput settings", + "knn classifier kdtree", + "knn datasets" + ] + }, + "kmeans_throughput": { "SETS": [ "sklearn-ex[cpu] implementations", - "common parameters", "throughput settings", - "datasets", - "algorithms" + "kmeans", + "kmeans datasets" ] } }