Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No an issue but you can take a look. #9

Open
MaurUppi opened this issue Jan 28, 2024 · 0 comments
Open

No an issue but you can take a look. #9

MaurUppi opened this issue Jan 28, 2024 · 0 comments

Comments

@MaurUppi
Copy link

MaurUppi commented Jan 28, 2024

A location_ipfire_db_reader library written by svaningelgem is able to read IPFires's location.db.
Therefore, I used the function and leveraged another HTTP API as two references for cross-checking source data.

The source data to be checked is dbip-country-lite-2024-01.csv
The references are IPFire's location.db and ip-api.com batch inquiry API. (100 IPs each request, 15s intervals )

Long story short, here is the result.

Total number of IPv4 records in the data source: 308422
Sampling size: 384
Processed 384/384 IPs
Accuracy (Cross-checked w/IPFire location.db): 60.677083333333336%
Processed 100/384 IPs
Processed 200/384 IPs
Processed 300/384 IPs
Processed 384/384 IPs
Accuracy (Cross-checked w/IP-API.com HTTP-API): 88.54166666666666%

Assuming two references have a high credit/confidence/accuracy level of CountryCode corresponding to CIDR.
The initial conclusion could be:

  1. the IP-API.com data looks highly aligned with dbip-country db.
  2. in contrast, location.db only had 40% same records as dbip-country db.

so, one of the references is lier?

The Python code is below, If you have free moments, take a look to check if there are any logical faults.

Details

import math
import time
import pandas as pd
import random
import ipaddress
import os
from location_ipfire_db_reader import LocationDatabase
from location_ipfire_db_reader.exceptions import IPAddressError
import requests

source_file_path = os.getenv('SOURCE_FILE_PATH', 'D:\\Dev\\GeoLocationData\\GeoDB_merger\\source\\dbip-country-lite-2024-01.csv')

log_file_path_ipfire = os.getenv('LOG_FILE_PATH_IPFIRE', 'D:\\Dev\\GeoLocationData\\GeoDB_merger\\log\\random_pickIPaddr_checkwith_IPFireDB.log')
log_file_path_IP_API = os.getenv('LOG_file_path_IP_API', 'D:\\Dev\\GeoLocationData\\GeoDB_merger\\log\\random_pickIPaddr_checkwith_IP-API.log')

requests_per_minute = int(os.getenv('REQUESTS_PER_MINUTE', '15'))


def is_ipv4(address):
    try:
        ipaddress.IPv4Address(address)
        return True
    except ipaddress.AddressValueError:
        return False

def calculate_sample_size(total, confidence_level, margin_error, p=0.5):
    if not isinstance(total, int) or total <= 0:
        raise ValueError("Total must be a positive integer")
    if confidence_level not in {90, 95, 99}:
        raise ValueError("Confidence level must be 90, 95, or 99.")
    if not (isinstance(margin_error, float) or isinstance(margin_error, int)) or margin_error <= 0:
        raise ValueError("Margin error must be a positive number.")
    if not (isinstance(p, float) or isinstance(p, int)) or not (0 <= p <= 1):
        raise ValueError("Expected population proportion 'p' must be a number between 0 and 1.")


    z_dict = {90: 1.645, 95: 1.96, 99: 2.576}
    z = z_dict.get(confidence_level)
    if z is None:
        raise ValueError(f"The z-value corresponding to {confidence_level} was not found.")

    sample_size = ((z**2) * p * (1-p)) / (margin_error**2)
    sample_size = sample_size / (1 + ((sample_size - 1) / total))

    # Return the sample size, rounded up to the nearest integer
    return math.ceil(sample_size)
    #return int(sample_size) if sample_size == int(sample_size) else int(sample_size) + 1

def calculate_accuracy(log_file_path_ipfire):
    match_count = 0
    total_count = 0
    with open(log_file_path_ipfire, 'r') as file:
        for line in file:
            total_count += 1
            if ', O' in line:
                match_count += 1
    accuracy = (match_count / total_count) * 100 if total_count > 0 else 0
    return accuracy

def ReadSourceCSVfile(csv_file_path):
    data = []
    with open(csv_file_path, 'r') as file:
        for line in file:
            start_ip, end_ip, country = line.strip().split(',')
            if is_ipv4(start_ip) and is_ipv4(end_ip):
                row = {
                    'start_ip': start_ip,
                    'end_ip': end_ip,
                    'country': country,
                    'start_ip_int': int(ipaddress.IPv4Address(start_ip)),
                    'end_ip_int': int(ipaddress.IPv4Address(end_ip))
                }
                data.append(row)
    df_ipv4 = pd.DataFrame(data)
    return df_ipv4


def generate_random_ips(df, sample_size, batch_size=100):
    all_ips = []
    for _ in range(sample_size):
        random_row = df.sample().iloc[0]
        start_ip_int = random_row['start_ip_int']
        end_ip_int = random_row['end_ip_int']
        random_ip_int = random.randint(start_ip_int, end_ip_int)
        all_ips.append(str(ipaddress.IPv4Address(random_ip_int)))
    return all_ips


def check_country_with_ipfire_db(df, ips, log_file_path_ipfire, db):
    match_count = 0
    processed = 0

    with open(log_file_path_ipfire, 'w') as log_file:
        for ip in ips:
            processed += 1
            ip_int = int(ipaddress.IPv4Address(ip))
            matching_rows = df[(df['start_ip_int'] <= ip_int) & (df['end_ip_int'] >= ip_int)]
            expected_country_code = matching_rows.iloc[0]['country'] if not matching_rows.empty else 'N/A'

            try:
                actual_country_code = db.find_country(ip)
            except IPAddressError:
                actual_country_code = 'N/A'

            match = 'O' if expected_country_code == actual_country_code else 'X'
            if match == 'O':
                match_count += 1

            result_line = f"{ip}, {expected_country_code}, {actual_country_code}, {match}"
            log_file.write(result_line + '\n')

            log_file.flush() 

        print(f"Processed {processed}/{len(ips)} IPs")

    return match_count, processed

def check_country_with_ip_api(df, ips, log_file_path, batch_size=100):
    match_count = 0
    processed = 0

    with open(log_file_path, 'w') as log_file:
        for batch_start in range(0, len(ips), batch_size):
            batch_ips = ips[batch_start:batch_start + batch_size]

            response = requests.post('http://ip-api.com/batch?fields=status,countryCode,query', json=batch_ips)
            
            if response.status_code != 200:
                print(f"Request failed with status code: {response.status_code}")
                continue

            rate_limit_remaining = response.headers.get('X-Rl')
            rate_limit_reset = response.headers.get('X-Ttl')

            if rate_limit_remaining is not None:
                rate_limit_remaining = int(rate_limit_remaining)
            else:
                print("Warning: 'X-Rl' header is missing. Defaulting to 0.")
                rate_limit_remaining = 0

            if rate_limit_reset is not None:
                rate_limit_reset = int(rate_limit_reset)
            else:
                print("Warning: 'X-Ttl' header is missing. Defaulting to 60.")
                rate_limit_reset = 60

            if rate_limit_remaining == 0:
                time.sleep(rate_limit_reset)
            else:
                # Calculate the delay needed to not exceed 15 requests per minute
                delay = 60 / requests_per_minute
                time.sleep(delay)

            try:
                batch_results = response.json()
            except ValueError as e:
                print(f"Error parsing JSON response: {e}")
                continue

            for ip, res in zip(batch_ips, batch_results):
                processed += 1
                if res['status'] == 'success':
                    ip_int = int(ipaddress.IPv4Address(ip))
                    matching_rows = df[(df['start_ip_int'] <= ip_int) & (df['end_ip_int'] >= ip_int)]
                    if not matching_rows.empty:
                        expected_country_code = matching_rows.iloc[0]['country']
                    else:
                        expected_country_code = 'N/A'
                    actual_country_code = res.get('countryCode', 'N/A')
                    match = 'O' if expected_country_code == actual_country_code else 'X'
                    if match == 'O':
                        match_count += 1
                else:
                    expected_country_code = 'N/A'
                    actual_country_code = 'N/A'
                    match = 'X'

                result_line = f"{ip}, {expected_country_code}, {actual_country_code}, {match}"
                log_file.write(result_line + '\n')
                log_file.flush() 

            print(f"Processed {processed}/{len(ips)} IPs")

    return match_count, processed



def main():
    # Read the CSV file and calculate the sample size
    df_ipv4 = ReadSourceCSVfile(source_file_path)
    total_ipv4_rows = len(df_ipv4)
    print(f"Total number of IPv4 records in the data source: {total_ipv4_rows}")

    confidence_level = 95
    margin_error = 0.05
    sample_size = calculate_sample_size(total_ipv4_rows, confidence_level, margin_error)
    print(f"Sampling size: {sample_size}")

    random_ips = generate_random_ips(df_ipv4, sample_size)

    # Initialize LocationDatabase instance
    db = LocationDatabase('location.db')

    # Check countries with IPFire DB
    check_country_with_ipfire_db(df_ipv4, random_ips, log_file_path_ipfire, db)
    accuracy_ipfire = calculate_accuracy(log_file_path_ipfire)
    print(f"Accuracy (Cross-checked w/IPFire location.db): {accuracy_ipfire}%")

    # Check countries with IP-API
    check_country_with_ip_api(df_ipv4, random_ips, log_file_path_IP_API)
    accuracy_ip_api = calculate_accuracy(log_file_path_IP_API)
    print(f"Accuracy (Cross-checked w/IP-API.com HTTP-API): {accuracy_ip_api}%")

if __name__ == "__main__":
    main()

The log output files.

random_pickIPaddr_checkwith_IPFireDB.log
random_pickIPaddr_checkwith_IP-API.log

Hope I'm not flooding the thread.

I had another source data file that merged from ( ipinfo's country.csv, IP2LOCATION-LITE-DB1.CSV, dbip-country-lite-2024-01.csv) GeoIP CSV data files.
What logic to merge is different stories and the result has yet to meet my expectations.

The key point is Accuracy results differ from the previous.

python.exe .\random_pickup_IPaddr_CheckTwoRefSource.py

Total number of IPv4 records in the data source: 630324
Sampling size: 384
Processed 384/384 IPs
Accuracy (Cross-checked w/IPFire location.db): 77.60416666666666%
Processed 100/384 IPs
Processed 200/384 IPs
Processed 300/384 IPs
Processed 384/384 IPs
Accuracy (Cross-checked w/IP-API.com HTTP-API): 80.20833333333334%

merged_ip_data.zip

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant