From f502efc4ea27df32eb4b39bda4a5e53e29fa50cb Mon Sep 17 00:00:00 2001 From: jurraca Date: Mon, 29 Sep 2025 14:22:39 +0200 Subject: [PATCH 1/4] use -p flag, remove batching --- kartograf/rpki/fetch.py | 55 +++++++------------- kartograf/rpki/parse.py | 107 ++++++++++++-------------------------- tests/test_rpki_parser.py | 1 - 3 files changed, 50 insertions(+), 113 deletions(-) diff --git a/kartograf/rpki/fetch.py b/kartograf/rpki/fetch.py index a2f2535..108721a 100644 --- a/kartograf/rpki/fetch.py +++ b/kartograf/rpki/fetch.py @@ -1,12 +1,9 @@ import subprocess import sys -from concurrent.futures import ThreadPoolExecutor, as_completed -import json from threading import Lock from pathlib import Path import requests -from tqdm import tqdm from kartograf.timed import timed from kartograf.util import ( @@ -99,11 +96,7 @@ def fetch_rpki_db(context): @timed def validate_rpki_db(context): - files = [path for path in Path(context.data_dir_rpki_cache).rglob('*') - if path.is_file() and ((path.suffix == ".roa") - or (path.name == ".roa"))] - - print(f"{len(files)} raw RKPI ROA files found.") + print("Validating RPKI ROAs") rpki_raw_file = 'rpki_raw.json' result_path = Path(context.out_dir_rpki) / rpki_raw_file @@ -115,46 +108,34 @@ def validate_rpki_db(context): with open(context.debug_log, 'a') as logs: logs.write("\n\n=== RPKI Validation ===\n") - def process_files_batch(batch): + def process_rpki_cache(): result = subprocess.run(["rpki-client", "-j", "-n", "-d", context.data_dir_rpki_cache, + "-p 16", "-P", context.epoch, - ] + tal_options + - ["-f"] + batch, # -f has to be last + ] + tal_options + [context.out_dir_rpki], capture_output=True, check=False) - if result.stderr and context.debug_log: - stderr_output = result.stderr.decode() + if context.debug_log: with debug_file_lock: with open(context.debug_log, 'a') as logs: - logs.write(stderr_output) + if result.stderr: + std_err = result.stderr.decode() + logs.write(std_err) + if result.stdout: + logs.write("\n== RPKI Validation Summary ==\n") + std_output = result.stdout.decode() + for line in std_output: + logs.write(line) return result.stdout - total = len(files) - batch_size = 250 - batches = [] - for i in range(0, total, batch_size): - batch = [str(f) for f in files[i:i + batch_size]] - batches.append(batch) - - total_batches = len(batches) - results = [] - with ThreadPoolExecutor() as executor: - futures = [executor.submit(process_files_batch, batch) for batch in batches] - for future in tqdm(as_completed(futures), total=total_batches): - result = future.result() - if result: - normalized = result.replace(b"\n}\n{\n\t", b"\n},\n{\n").decode('utf-8').strip() - results.append(normalized) - results_json = json.loads("[" + ",".join(results) + "]") - s = sorted(results_json, key=lambda result: result["hash_id"]) - - with open(result_path, 'w') as f: - json.dump(s, f) - - print(f"{len(results_json)} RKPI ROAs validated\nSaved to: {result_path.name}\nFile hash: {calculate_sha256(result_path)}") + process_rpki_cache() + default_out_dir = Path(context.out_dir_rpki) / "json" + default_out_dir.rename(result_path) + + print(f"RKPI ROAs validated and saved to {result_path}, file hash: {calculate_sha256(result_path)}") diff --git a/kartograf/rpki/parse.py b/kartograf/rpki/parse.py index c8f1fee..831380f 100644 --- a/kartograf/rpki/parse.py +++ b/kartograf/rpki/parse.py @@ -20,87 +20,47 @@ def parse_rpki(context): dups_count = 0 out_count = 0 - invalids = 0 - incompletes = 0 - not_roas = 0 with open(raw_input, "r") as dump: data = json.loads(dump.read()) - print(f'Parsing {len(data)} ROAs') - - for roa in data: - # Sometimes ROAs are incomplete and we have to skip them - key_list = [ - 'type', - 'validation', - 'aki', - 'ski', - 'vrps', - 'valid_until' - ] - if not all(key in roa for key in key_list): - incompletes += 1 - continue - - # We are only interested in ROAs - if roa['type'] != "roa": - not_roas += 1 + print(f'Parsing {data["metadata"]["roas"]} ROAs') + + for roa in data["roas"]: + asn = roa['asn'] + expiry = roa['expires'] + prefix = parse_pfx(roa['prefix']) + + # Bogon prefixes and ASNs are excluded since they can not + # be used for routing. + if not prefix or is_bogon_pfx(prefix) or is_bogon_asn(asn): + if context.debug_log: + with open(context.debug_log, 'a') as logs: + logs.write(f"RPKI: parser encountered an invalid entry: {prefix} with ASN {asn}\n") continue - # We are only interested in valid ROAs - if roa['validation'] != "OK": - invalids += 1 + if context.max_encode and is_out_of_encoding_range(asn, context.max_encode): continue - valid_until = roa['valid_until'] - valid_since = roa['valid_since'] - - for vrp in roa['vrps']: - asn = vrp['asid'] - prefix = parse_pfx(vrp['prefix']) - if not prefix: - if context.debug_log: - with open(context.debug_log, 'a') as logs: - logs.write(f"Could not parse prefix from line: {vrp['prefix']}") - continue - # Bogon prefixes and ASNs are excluded since they can not - # be used for routing. - if is_bogon_pfx(prefix) or is_bogon_asn(asn): - if context.debug_log: - with open(context.debug_log, 'a') as logs: - logs.write(f"RPKI: parser encountered an invalid IP network: {prefix}\n") - continue - - if context.max_encode and is_out_of_encoding_range(asn, context.max_encode): - continue - - # Multiple ROAs for the same prefix are possible and we need - # to decide if we update the entry or not - if output_cache.get(prefix): - dups_count += 1 - # If the new ASN is from a ROA that is valid for longer, - # we override the old entry with it - [old_asn, old_valid_until, old_valid_since] = output_cache[prefix] - if int(valid_until) > int(old_valid_until): - output_cache[prefix] = [asn, valid_until, valid_since] - # If the entries have the same validity period, we need to - # choose a different tie breaker - if int(valid_until) == int(old_valid_until): - # Prefer the ROA that was announced last - if int(valid_since) > int(old_valid_since): - output_cache[prefix] = [asn, valid_until, valid_since] - # If the ROAs were also announced at the same time, we - # fall back to using the lower ASN just to be - # deterministic - if int(valid_since) == int(old_valid_since): - if int(asn) < int(old_asn): - output_cache[prefix] = [asn, valid_until, valid_since] - else: - # No duplicate, add to cache - output_cache[prefix] = [asn, valid_until, valid_since] + # Multiple ROAs for the same prefix are possible and we need + # to decide if we update the entry or not + if output_cache.get(prefix): + dups_count += 1 + # If the new ASN is from a ROA that is valid for longer, + # we override the old entry with it + [old_asn, old_expiry] = output_cache[prefix] + if expiry > old_expiry: + output_cache[prefix] = [asn, expiry] + # If the entries have the same validity period, we need to + # choose a different tie breaker + if expiry == old_expiry: + if int(asn) < int(old_asn): + output_cache[prefix] = [asn, expiry] + else: + # No duplicate, add to cache + output_cache[prefix] = [asn, expiry] with open(rpki_res, "w") as asmap: - for prefix, [asn, _, _] in output_cache.items(): + for prefix, [asn, _] in output_cache.items(): line_out = f"{prefix} AS{asn}" asmap.write(line_out + '\n') @@ -110,6 +70,3 @@ def parse_rpki(context): print(f'Result entries written: {out_count}') print(f'Duplicates found: {dups_count}') - print(f'Invalids found: {invalids}') - print(f'Incompletes: {incompletes}') - print(f'Non-ROA files: {not_roas}') diff --git a/tests/test_rpki_parser.py b/tests/test_rpki_parser.py index 5b54fe9..3a0bad8 100644 --- a/tests/test_rpki_parser.py +++ b/tests/test_rpki_parser.py @@ -1,4 +1,3 @@ -import json import os from kartograf.rpki.parse import parse_rpki From c5be45375f25a128adc24493b2a7b11f342bd5ce Mon Sep 17 00:00:00 2001 From: jurraca Date: Tue, 14 Oct 2025 12:24:55 +0200 Subject: [PATCH 2/4] fix tests Removes test clauses which are no longer relevant, and test cases which no longer apply. --- tests/context.py | 14 ++++---- tests/test_rpki_parser.py | 67 ++------------------------------------- 2 files changed, 9 insertions(+), 72 deletions(-) diff --git a/tests/context.py b/tests/context.py index ea5b48d..ce0b792 100644 --- a/tests/context.py +++ b/tests/context.py @@ -32,16 +32,16 @@ def load_rpki_csv_to_json(context, fixtures_path): with open(csv_path, 'r') as csvfile: reader = csv.DictReader(csvfile) for row in reader: - vrps = [{"prefix": row["prefix"], "asid": row["asid"], "maxlen": row["maxlen"]}] - del row["prefix"] - del row["asid"] - del row["maxlen"] - row["vrps"] = vrps - rpki_data.append(row) + vrps = {"prefix": row["prefix"], "asn": row["asid"], "maxlen": row["maxlen"], "expires": row["valid_until"]} + # del row["prefix"] + # del row["asid"] + # del row["maxlen"] + # row["vrps"] = vrps + rpki_data.append(vrps) output_path = Path(context.out_dir_rpki) / 'rpki_raw.json' with open(output_path, 'w') as jsonfile: - json.dump(rpki_data, jsonfile, indent=2) + json.dump({"metadata": {"roas": 24}, "roas": rpki_data}, jsonfile, indent=2) def load_irr_fixtures(context, fixtures_path): for file in irr_fixtures(): diff --git a/tests/test_rpki_parser.py b/tests/test_rpki_parser.py index 3a0bad8..581cf36 100644 --- a/tests/test_rpki_parser.py +++ b/tests/test_rpki_parser.py @@ -35,57 +35,9 @@ def test_roa_validations(tmp_path, capsys): captured = capsys.readouterr() - assert len(final_lines) == 10, "Should have found 10 valid ROAs" - assert "Result entries written: 10" in captured.out + assert len(final_lines) == 12, "Should have found 12 valid ROAs" + assert "Result entries written: 12" in captured.out assert "Duplicates found: 5" in captured.out - assert "Invalids found: 1" in captured.out - assert "Incompletes: 0" in captured.out - assert "Non-ROA files: 1" in captured.out - -def test_roa_incompletes(tmp_path, capsys): - ''' - Test that the ROA file has missing entries. - The data is mocked here and written to a json file. - ''' - epoch = "111111112" - context = create_test_context(tmp_path, epoch) - test_data = [ - { - "type": "roa", - "validation": "OK", - "ski": "some-ski", - "vrps": [{"prefix": "192.0.2.0/24", "asid": "64496", "maxlen": "24"}], - "valid_until": "1234567890", - "valid_since": "1234567880" - }, - { - "type": "roa", - "validation": "OK", - "ski": "some-ski", - "vrps": [{"prefix": "198.51.100.0/24", "asid": "64497", "maxlen": "24"}], - "valid_until": "1234567890", - "valid_since": "1234567880" - } - ] - - # Write test data to rpki_raw.json - with open(os.path.join(context.out_dir_rpki, "rpki_raw.json"), "w") as f: - json.dump(test_data, f) - - parse_rpki(context) - - # Check that rpki_final.txt was created - final_path = os.path.join(context.out_dir_rpki, "rpki_final.txt") - assert os.path.exists(final_path), "rpki_final.txt should exist" - - # Count entries in final output - with open(final_path, "r") as f: - final_lines = f.readlines() - - assert len(final_lines) == 0, "No rows should be written" - captured = capsys.readouterr() - assert "Incompletes: 2" in captured.out - def test_roa_valid_until_fallback(tmp_path): '''Test ROA selection falls back to later valid_until''' @@ -102,21 +54,6 @@ def test_roa_valid_until_fallback(tmp_path): assert not any("101.0.1.0/24 AS11101" in e for e in entries), "ROA with earlier valid_until should not be selected" -def test_roa_valid_since_fallback(tmp_path): - '''Test ROA selection falls back to later valid_since when valid_until matches''' - epoch = "111111111" - context = create_test_context(tmp_path, epoch) - setup_test_data(context) - parse_rpki(context) - - final_path = os.path.join(context.out_dir_rpki, "rpki_final.txt") - with open(final_path, "r") as f: - entries = [line.strip() for line in f.readlines()] - - assert "102.0.100.0/24 AS11104" in entries, "ROA with later valid_since should be selected" - assert not any("102.0.100.0/24 AS11103" in e for e in entries), "ROA with earlier valid_since should not be selected" - - def test_roa_asn_fallback(tmp_path): '''Test ROA selection falls back to lower ASN when timestamps match''' epoch = "111111111" From dc403a682ea6617636a4cdb29fe991d120d29f6f Mon Sep 17 00:00:00 2001 From: jurraca Date: Fri, 21 Nov 2025 14:59:14 +0100 Subject: [PATCH 3/4] get cpu_count from host --- kartograf/rpki/fetch.py | 4 +++- kartograf/util.py | 8 ++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/kartograf/rpki/fetch.py b/kartograf/rpki/fetch.py index 108721a..cf21eeb 100644 --- a/kartograf/rpki/fetch.py +++ b/kartograf/rpki/fetch.py @@ -9,6 +9,7 @@ from kartograf.util import ( calculate_sha256, calculate_sha256_directory, + get_threads, ) TAL_URLS = { @@ -101,6 +102,7 @@ def validate_rpki_db(context): result_path = Path(context.out_dir_rpki) / rpki_raw_file tal_options = [item for path in data_tals(context) for item in ('-t', path)] + threads = get_threads() debug_file_lock = Lock() @@ -114,7 +116,7 @@ def process_rpki_cache(): "-n", "-d", context.data_dir_rpki_cache, - "-p 16", + f"-p {threads}", "-P", context.epoch, ] + tal_options + [context.out_dir_rpki], diff --git a/kartograf/util.py b/kartograf/util.py index 57179b0..7b3f84d 100644 --- a/kartograf/util.py +++ b/kartograf/util.py @@ -173,3 +173,11 @@ def get_root_network(pfx): if root_net: return int(root_net, 16) return None + + +def get_threads(): + """ + Returns the count of logical CPUs on the host machine. + If None are found (for some reason), defaults to 4. + """ + return os.cpu_count() or 4 From fbeb1eebf336c995697023c201898ff99e1d38e7 Mon Sep 17 00:00:00 2001 From: jurraca Date: Sun, 23 Nov 2025 15:39:36 +0100 Subject: [PATCH 4/4] use new get_threads in merge.py --- kartograf/merge.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/kartograf/merge.py b/kartograf/merge.py index 1dc779a..3cdf1e6 100644 --- a/kartograf/merge.py +++ b/kartograf/merge.py @@ -2,13 +2,12 @@ from pathlib import Path import ipaddress import math -import os import shutil from types import SimpleNamespace import pandas as pd from kartograf.timed import timed -from kartograf.util import get_root_network +from kartograf.util import get_root_network, get_threads class BaseNetworkIndex: @@ -163,7 +162,7 @@ def pick_chunk_size(n_rows: int, workers: int | None = None, min_chunk: int = 5, max_chunk: int = 200_000) -> int: if workers is None: - workers = os.cpu_count() or 4 + workers = get_threads() chunk = math.ceil(n_rows / workers) return max(min_chunk, min(max_chunk, chunk))