343 lines
9.0 KiB
Python
343 lines
9.0 KiB
Python
from flask import Flask, render_template, request
|
|
from datetime import datetime
|
|
import os
|
|
import random
|
|
import yaml
|
|
import geoip2.database
|
|
import socket
|
|
import requests
|
|
import time
|
|
import ssl
|
|
|
|
template_dir = os.path.abspath('.')
|
|
app = Flask(__name__, template_folder=template_dir)
|
|
cli = ["curl", "HTTPie", "httpie-go", "Wget", "fetch libfetch",
|
|
"Go", "Go-http-client", "ddclient", "Mikrotik", "xh"]
|
|
colors = ['#2488bf', '#d84d3d', '#f39700', '#4caf50']
|
|
private = [
|
|
'10.0.0.0/8',
|
|
'172.16.0.0/12',
|
|
'192.168.0.0/16',
|
|
'169.254.0.0/16',
|
|
'127.0.0.0/8',
|
|
'fc00::/7',
|
|
]
|
|
queries = 0
|
|
|
|
|
|
def to_yaml(value):
|
|
return (yaml.dump(value, default_flow_style=False))
|
|
|
|
|
|
app.jinja_env.filters['to_yaml'] = to_yaml
|
|
|
|
|
|
def get_name(host):
|
|
if os.environ.get('GEOIPNAME') is not None:
|
|
return (os.environ.get('GEOIPNAME'))
|
|
return (host)
|
|
|
|
|
|
def gethost(ip):
|
|
"""Get IP's host"""
|
|
try:
|
|
name = socket.getnameinfo((ip, 0), 0)[0]
|
|
except:
|
|
name = ip
|
|
return (name)
|
|
|
|
|
|
def getip(ip):
|
|
"""Get IP's host"""
|
|
if any(c.isalpha() for c in ip) and '.' in ip:
|
|
try:
|
|
ip = socket.gethostbyname(ip)
|
|
except:
|
|
ip = 0
|
|
return (ip)
|
|
|
|
|
|
def getclientip(request):
|
|
"""Get client's IP"""
|
|
if request.headers.getlist('X-Forwarded-For'):
|
|
return (request.headers.getlist('X-Forwarded-For')[0])
|
|
elif request.headers.get('CF-Connecting-IP'):
|
|
return (request.headers.get('CF-Connecting-IP'))
|
|
else:
|
|
return (request.remote_addr)
|
|
|
|
|
|
def gethead(request, ip):
|
|
"""Get headers informations about the client"""
|
|
ip = getclientip(request)
|
|
if request.headers.get("X-Forwarded-For"):
|
|
forwarded = request.headers.get("X-Forwarded-For").split(',')[0]
|
|
else:
|
|
forwarded = "None, there is no forwarded IP"
|
|
head = {
|
|
"ip_address": ip,
|
|
"remote_host": gethost(ip),
|
|
"port": request.environ.get('REMOTE_PORT'),
|
|
"user-agent": request.headers.get("User-Agent"),
|
|
"mime_type": request.headers.get("Accept"),
|
|
"language": request.headers.get("Accept-Language"),
|
|
"encoding": request.headers.get("Accept-Encoding"),
|
|
"method": request.method,
|
|
"cache-control": request.headers.get("Cache-Control"),
|
|
"x-forwarded-for": forwarded.split(',')[0],
|
|
"x-forwarded-proto": request.headers.get("X-Forwarded-Proto")
|
|
}
|
|
return (head)
|
|
|
|
|
|
def getcity(ip):
|
|
"""Get city informations about the client"""
|
|
geoipath = "/var/lib/GeoIP"
|
|
try:
|
|
with geoip2.database.Reader(geoipath + '/GeoLite2-City.mmdb') as city_reader:
|
|
city_res = city_reader.city(ip)
|
|
except:
|
|
return (0)
|
|
return (city_res)
|
|
|
|
|
|
def getasn(ip):
|
|
"""Get ASN informations about the client"""
|
|
geoipath = "/var/lib/GeoIP"
|
|
try:
|
|
with geoip2.database.Reader(geoipath + '/GeoLite2-ASN.mmdb') as asn_reader:
|
|
asn_res = asn_reader.asn(ip)
|
|
except:
|
|
return (0)
|
|
return (asn_res)
|
|
|
|
|
|
def getssl(ip):
|
|
"""Get SSL informations about the client"""
|
|
ctx = ssl.create_default_context()
|
|
|
|
conn = ctx.wrap_socket(socket.socket(socket.AF_INET), server_hostname=ip)
|
|
conn.settimeout(1)
|
|
try:
|
|
conn.connect((ip, 443))
|
|
cert = conn.getpeercert()
|
|
conn.close()
|
|
except:
|
|
return (0)
|
|
|
|
sslinfo = {
|
|
"ssl_subject": dict(x[0] for x in cert['subject'])['commonName'],
|
|
"ssl_san": [x[1] for x in cert['subjectAltName']],
|
|
"ssl_issuer": dict(x[0] for x in cert['issuer'])['organizationName'],
|
|
"ssl_not_before": datetime.strptime(cert['notBefore'], '%b %d %H:%M:%S %Y %Z'),
|
|
"ssl_not_after": datetime.strptime(cert['notAfter'], '%b %d %H:%M:%S %Y %Z'),
|
|
}
|
|
|
|
return (sslinfo)
|
|
|
|
|
|
def getgeo(ip):
|
|
"""Get geolocation informations about the client"""
|
|
infos = {
|
|
"ip_address": ip
|
|
}
|
|
infos.update({"hostname": gethost(ip)})
|
|
|
|
city_res = getcity(ip)
|
|
if city_res:
|
|
infos.update({
|
|
"country": city_res.country.name,
|
|
"country_code": city_res.country.iso_code,
|
|
"city": city_res.city.name,
|
|
"latitude": city_res.location.latitude,
|
|
"longitude": city_res.location.longitude
|
|
})
|
|
else:
|
|
return (0)
|
|
|
|
asn_res = getasn(ip)
|
|
if asn_res:
|
|
infos.update({
|
|
"isp": asn_res.autonomous_system_organization,
|
|
"asn": f"AS{asn_res.autonomous_system_number}"
|
|
})
|
|
else:
|
|
return (0)
|
|
|
|
return (infos)
|
|
|
|
|
|
def getself(request, info):
|
|
"""Get informations about the client"""
|
|
ip = getclientip(request)
|
|
|
|
if info.isdigit() and len(info) <= 5:
|
|
return (f"Port {info} is {getport(ip, info)}\n")
|
|
|
|
info = info.lower()
|
|
infos = gethead(request, ip)
|
|
if (geodata := getgeo(ip)):
|
|
for k, v in geodata.items():
|
|
infos[k] = v
|
|
else:
|
|
if not infos:
|
|
infos = {"Error": f"No location data about {ip}"}
|
|
|
|
if info == "all":
|
|
return (to_yaml(infos))
|
|
if info == "json":
|
|
return (infos)
|
|
|
|
if not info in infos and info != "all":
|
|
return (f"No data about {info} for {ip}\n")
|
|
|
|
if isinstance(infos[info], list):
|
|
conv = {i: infos[info][i] for i in range(0, len(infos[info]))}
|
|
infos[info] = conv
|
|
return (to_yaml(infos[info]))
|
|
|
|
return (infos[info] + '\n')
|
|
|
|
|
|
def getport(ip, port):
|
|
"""Check if port is open or closed"""
|
|
if int(port) >= 1 and int(port) <= 65535:
|
|
try:
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
s.settimeout(0.5)
|
|
s.connect((ip, int(port)))
|
|
return ("open")
|
|
s.close()
|
|
except:
|
|
return ("closed")
|
|
return ("closed")
|
|
|
|
|
|
@app.route('/metrics')
|
|
def getmetrics():
|
|
return (f"geoipweb_queries {str(queries)}\n")
|
|
|
|
|
|
@app.route('/<ip>/<info>')
|
|
def getspec(ip, info):
|
|
global queries
|
|
queries += 1
|
|
if any(c.isalpha() for c in ip) and '.' in ip:
|
|
domain = ip
|
|
ip = getip(ip)
|
|
if ip == 0:
|
|
return (f"Host unavailable\n")
|
|
if ip in private:
|
|
return (f"{ip} is a private IP\n")
|
|
|
|
if info.isdigit() and len(info) <= 5:
|
|
return (f"Port {info} is {getport(ip, info)}\n")
|
|
|
|
info = info.lower()
|
|
if (infos := getgeo(ip)) == 0:
|
|
infos = {"Error": f"No location data about {ip}"}
|
|
|
|
if 'domain' in locals():
|
|
infos.update({"domain": domain})
|
|
if (sslinfos := getssl(domain)) != 0:
|
|
infos.update(sslinfos)
|
|
|
|
if info == "all":
|
|
return (to_yaml(infos))
|
|
if info == "json":
|
|
return (infos)
|
|
|
|
if not info in infos:
|
|
return (f"No data about {info} for {ip}\n")
|
|
|
|
if isinstance(infos[info], list):
|
|
conv = {i: infos[info][i] for i in range(0, len(infos[info]))}
|
|
infos[info] = conv
|
|
return (to_yaml(infos[info]))
|
|
|
|
return (str(infos[info]) + '\n')
|
|
|
|
|
|
@app.route('/<ip>')
|
|
def getinfo(ip):
|
|
global queries
|
|
queries += 1
|
|
if any(c.isalpha() for c in ip) and '.' in ip:
|
|
domain = ip
|
|
ip = getip(ip)
|
|
if ip == 0:
|
|
return (f"Host unavailable\n")
|
|
if '.' not in ip and ':' not in ip:
|
|
return (getself(request, ip))
|
|
if ip.isdigit() and len(ip) <= 5:
|
|
return (getself(request, ip))
|
|
if ip in private:
|
|
return (f"{ip} is a private IP\n")
|
|
|
|
if (infos := getgeo(ip)) == 0:
|
|
infos = {"Error": f"No location data about {ip}"}
|
|
|
|
if 'domain' in locals():
|
|
infos.update({"domain": domain})
|
|
if (sslinfos := getssl(domain)) != 0:
|
|
infos.update(sslinfos)
|
|
|
|
if any(x in request.headers.get('User-Agent') for x in cli):
|
|
return (to_yaml(infos))
|
|
else:
|
|
title = get_name(request.headers.get("host"))
|
|
return (render_template('template.html',
|
|
hostname=request.headers.get("host"),
|
|
title=title,
|
|
ip=request.headers.get('X-Client-Ip'),
|
|
infos=infos,
|
|
color=random.choice(colors)))
|
|
|
|
|
|
@app.route('/<ip>/json')
|
|
def getinfojson(ip):
|
|
global queries
|
|
queries += 1
|
|
if any(c.isalpha() for c in ip) and '.' in ip:
|
|
domain = ip
|
|
ip = getip(ip)
|
|
if ip == 0:
|
|
return (f"Host unavailable\n")
|
|
if ip in private:
|
|
return (f"{ip} is a private IP\n")
|
|
|
|
if (infos := getgeo(ip)) == 0:
|
|
infos = {"Error": f"No location data about {ip}"}
|
|
|
|
if 'domain' in locals():
|
|
infos.update({"domain": domain})
|
|
if (sslinfos := getssl(domain)) != 0:
|
|
infos.update(sslinfos)
|
|
|
|
return (infos)
|
|
|
|
|
|
@app.route('/')
|
|
def self():
|
|
global queries
|
|
queries += 1
|
|
ip = getclientip(request)
|
|
if any(x in request.headers.get('User-Agent') for x in cli):
|
|
return (str(ip) + '\n')
|
|
head = gethead(request, ip)
|
|
if (infos := getgeo(ip)) == 0:
|
|
infos = {"Error": f"No location data about {ip}"}
|
|
title = get_name(request.headers.get("host"))
|
|
return (render_template('template.html',
|
|
hostname=request.headers.get("host"),
|
|
title=title,
|
|
ip=ip,
|
|
head=head,
|
|
infos=infos,
|
|
color=random.choice(colors)))
|
|
|
|
|
|
@app.route('/favicon.ico')
|
|
def favicon():
|
|
return ("Not found"), 404
|