# -*- coding: utf-8 -*-
from dns import resolver, reversename
from urllib import parse
import pandas as pd
import numpy as np
import ipaddress
import geocoder
import math
import os
import re
"""
url2features.dns: Features based on the DNS records
This module borrows heavily from URL Feature Extractor
by Lucas Ayres : https://www.lucasayres.com.br
Code taken and modified from
https://github.com/lucasayres/url-feature-extractor
"""
########################################################################################
[docs]def dns_features(df, columns, add_prefix=True):
"""
Given a pandas dataframe and a set of column names.
calculate the DNS summary features and add them.
"""
rez = df.copy()
for col in columns:
rez = add_dns_features(rez, col, add_prefix)
return rez
########################################################################################
[docs]def add_dns_features(df, col, add_prefix=True):
"""
Given a pandas dataframe and a column name containing a URL
calculate the DNS features
"""
def dns_feature_gen(x, col):
if x[col]!=x[col]:
ns = 0
mx = 0
spf = 0
ptr = 0
country = ""
else:
url = (x[col])
url_parts = split_url_into_parts(url)
ns = count_name_servers(url_parts)
mx = count_mx_servers(url_parts)
spf = 0
ptr = get_ptr(url_parts)
country = get_country(url_parts)
return ns, mx, ptr, country
if add_prefix:
col_names = [ col+'_dns_ns', col+'_dns_mx', col+'_dns_ptr', col+'_dns_country' ]
else:
col_names = [ 'dns_ns', 'dns_mx', 'dns_ptr', 'dns_country' ]
df[ col_names ] = df.apply(dns_feature_gen, col=col, axis=1, result_type="expand")
return df
########################################################################################
[docs]def valid_ip(host):
"""Return if the domain has a valid IP format (IPv4 or IPv6)."""
try:
ipaddress.ip_address(host)
return True
except Exception:
return False
########################################################################################
[docs]def count_ips(url_parts):
"""Return the number of resolved IPs (IPv4)."""
if valid_ip(url_parts['host']):
return 1
try:
answers = resolver.query(url_parts['host'], 'A')
return len(answers)
except Exception:
return '?'
########################################################################################
[docs]def count_name_servers(url_parts):
"""Return number of NameServers (NS) resolved."""
count = 0
if count_ips(url_parts):
try:
answers = resolver.query(url_parts['host'], 'NS')
return len(answers)
except (resolver.NoAnswer, resolver.NXDOMAIN):
split_host = url_parts['host'].split('.')
while len(split_host) > 0:
split_host.pop(0)
supposed_domain = '.'.join(split_host)
try:
answers = resolver.query(supposed_domain, 'NS')
count = len(answers)
break
except Exception:
count = 0
except Exception:
count = 0
return count
########################################################################################
[docs]def count_mx_servers(url_parts):
"""Return Number of Resolved MX Servers."""
count = 0
if count_ips(url_parts):
try:
answers = resolver.query(url_parts['host'], 'MX')
return len(answers)
except (resolver.NoAnswer, resolver.NXDOMAIN):
split_host = url_parts['host'].split('.')
while len(split_host) > 0:
split_host.pop(0)
supposed_domain = '.'.join(split_host)
try:
answers = resolver.query(supposed_domain, 'MX')
count = len(answers)
break
except Exception:
count = 0
except Exception:
count = 0
return count
########################################################################################
[docs]def get_ptr(url_parts):
"""Return PTR associated with IP."""
try:
if valid_ip(url_parts['host']):
ip = url_parts['host']
else:
ip = resolver.query(url_parts['host'], 'A')
ip = ip[0].to_text()
if ip:
r = reversename.from_address(ip)
result = resolver.query(r, 'PTR')[0].to_text()
return result
else:
return '?'
except Exception:
return '?'
########################################################################################
[docs]def get_country(url_parts):
"""Return the country associated with IP."""
try:
if valid_ip(url_parts['host']):
ip = url_parts['host']
else:
ip = resolver.query(url_parts['host'], 'A')
ip = ip[0].to_text()
if ip:
coded = geocoder.ip(ip)
return coded.country
else:
return '?'
except Exception:
return '?'
########################################################################################
[docs]def split_url_into_parts(url):
"""Split URL into: protocol, host, path, params, query and fragment."""
if not parse.urlparse(url.strip()).scheme:
url = 'http://' + url
protocol, host, path, params, query, fragment = parse.urlparse(url.strip())
result = {
'url': host + path + params + query + fragment,
'protocol': protocol,
'host': host,
'path': path,
'params': params,
'query': query,
'fragment': fragment
}
return result