Files
tchivert 8660abff57
build / build (push) Failing after 1m26s
fix data type
2024-07-14 15:26:36 +02:00

343 lines
9.0 KiB
Python

from flask import Flask, render_template, request
from datetime import datetime
import os
import random
import yaml
import geoip2.database
import socket
import requests
import time
import ssl
template_dir = os.path.abspath('.')
app = Flask(__name__, template_folder=template_dir)
cli = ["curl", "HTTPie", "httpie-go", "Wget", "fetch libfetch",
"Go", "Go-http-client", "ddclient", "Mikrotik", "xh"]
colors = ['#2488bf', '#d84d3d', '#f39700', '#4caf50']
private = [
'10.0.0.0/8',
'172.16.0.0/12',
'192.168.0.0/16',
'169.254.0.0/16',
'127.0.0.0/8',
'fc00::/7',
]
queries = 0
def to_yaml(value):
return (yaml.dump(value, default_flow_style=False))
app.jinja_env.filters['to_yaml'] = to_yaml
def get_name(host):
if os.environ.get('GEOIPNAME') is not None:
return (os.environ.get('GEOIPNAME'))
return (host)
def gethost(ip):
"""Get IP's host"""
try:
name = socket.getnameinfo((ip, 0), 0)[0]
except:
name = ip
return (name)
def getip(ip):
"""Get IP's host"""
if any(c.isalpha() for c in ip) and '.' in ip:
try:
ip = socket.gethostbyname(ip)
except:
ip = 0
return (ip)
def getclientip(request):
"""Get client's IP"""
if request.headers.getlist('X-Forwarded-For'):
return (request.headers.getlist('X-Forwarded-For')[0])
elif request.headers.get('CF-Connecting-IP'):
return (request.headers.get('CF-Connecting-IP'))
else:
return (request.remote_addr)
def gethead(request, ip):
"""Get headers informations about the client"""
ip = getclientip(request)
if request.headers.get("X-Forwarded-For"):
forwarded = request.headers.get("X-Forwarded-For").split(',')[0]
else:
forwarded = "None, there is no forwarded IP"
head = {
"ip_address": ip,
"remote_host": gethost(ip),
"port": request.environ.get('REMOTE_PORT'),
"user-agent": request.headers.get("User-Agent"),
"mime_type": request.headers.get("Accept"),
"language": request.headers.get("Accept-Language"),
"encoding": request.headers.get("Accept-Encoding"),
"method": request.method,
"cache-control": request.headers.get("Cache-Control"),
"x-forwarded-for": forwarded.split(',')[0],
"x-forwarded-proto": request.headers.get("X-Forwarded-Proto")
}
return (head)
def getcity(ip):
"""Get city informations about the client"""
geoipath = "/var/lib/GeoIP"
try:
with geoip2.database.Reader(geoipath + '/GeoLite2-City.mmdb') as city_reader:
city_res = city_reader.city(ip)
except:
return (0)
return (city_res)
def getasn(ip):
"""Get ASN informations about the client"""
geoipath = "/var/lib/GeoIP"
try:
with geoip2.database.Reader(geoipath + '/GeoLite2-ASN.mmdb') as asn_reader:
asn_res = asn_reader.asn(ip)
except:
return (0)
return (asn_res)
def getssl(ip):
"""Get SSL informations about the client"""
ctx = ssl.create_default_context()
conn = ctx.wrap_socket(socket.socket(socket.AF_INET), server_hostname=ip)
conn.settimeout(1)
try:
conn.connect((ip, 443))
cert = conn.getpeercert()
conn.close()
except:
return (0)
sslinfo = {
"ssl_subject": dict(x[0] for x in cert['subject'])['commonName'],
"ssl_san": [x[1] for x in cert['subjectAltName']],
"ssl_issuer": dict(x[0] for x in cert['issuer'])['organizationName'],
"ssl_not_before": datetime.strptime(cert['notBefore'], '%b %d %H:%M:%S %Y %Z'),
"ssl_not_after": datetime.strptime(cert['notAfter'], '%b %d %H:%M:%S %Y %Z'),
}
return (sslinfo)
def getgeo(ip):
"""Get geolocation informations about the client"""
infos = {
"ip_address": ip
}
infos.update({"hostname": gethost(ip)})
city_res = getcity(ip)
if city_res:
infos.update({
"country": city_res.country.name,
"country_code": city_res.country.iso_code,
"city": city_res.city.name,
"latitude": city_res.location.latitude,
"longitude": city_res.location.longitude
})
else:
return (0)
asn_res = getasn(ip)
if asn_res:
infos.update({
"isp": asn_res.autonomous_system_organization,
"asn": f"AS{asn_res.autonomous_system_number}"
})
else:
return (0)
return (infos)
def getself(request, info):
"""Get informations about the client"""
ip = getclientip(request)
if info.isdigit() and len(info) <= 5:
return (f"Port {info} is {getport(ip, info)}\n")
info = info.lower()
infos = gethead(request, ip)
if (geodata := getgeo(ip)):
for k, v in geodata.items():
infos[k] = v
else:
if not infos:
infos = {"Error": f"No location data about {ip}"}
if info == "all":
return (to_yaml(infos))
if info == "json":
return (infos)
if not info in infos and info != "all":
return (f"No data about {info} for {ip}\n")
if isinstance(infos[info], list):
conv = {i: infos[info][i] for i in range(0, len(infos[info]))}
infos[info] = conv
return (to_yaml(infos[info]))
return (infos[info] + '\n')
def getport(ip, port):
"""Check if port is open or closed"""
if int(port) >= 1 and int(port) <= 65535:
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(0.5)
s.connect((ip, int(port)))
return ("open")
s.close()
except:
return ("closed")
return ("closed")
@app.route('/metrics')
def getmetrics():
return (f"geoipweb_queries {str(queries)}\n")
@app.route('/<ip>/<info>')
def getspec(ip, info):
global queries
queries += 1
if any(c.isalpha() for c in ip) and '.' in ip:
domain = ip
ip = getip(ip)
if ip == 0:
return (f"Host unavailable\n")
if ip in private:
return (f"{ip} is a private IP\n")
if info.isdigit() and len(info) <= 5:
return (f"Port {info} is {getport(ip, info)}\n")
info = info.lower()
if (infos := getgeo(ip)) == 0:
infos = {"Error": f"No location data about {ip}"}
if 'domain' in locals():
infos.update({"domain": domain})
if (sslinfos := getssl(domain)) != 0:
infos.update(sslinfos)
if info == "all":
return (to_yaml(infos))
if info == "json":
return (infos)
if not info in infos:
return (f"No data about {info} for {ip}\n")
if isinstance(infos[info], list):
conv = {i: infos[info][i] for i in range(0, len(infos[info]))}
infos[info] = conv
return (to_yaml(infos[info]))
return (str(infos[info]) + '\n')
@app.route('/<ip>')
def getinfo(ip):
global queries
queries += 1
if any(c.isalpha() for c in ip) and '.' in ip:
domain = ip
ip = getip(ip)
if ip == 0:
return (f"Host unavailable\n")
if '.' not in ip and ':' not in ip:
return (getself(request, ip))
if ip.isdigit() and len(ip) <= 5:
return (getself(request, ip))
if ip in private:
return (f"{ip} is a private IP\n")
if (infos := getgeo(ip)) == 0:
infos = {"Error": f"No location data about {ip}"}
if 'domain' in locals():
infos.update({"domain": domain})
if (sslinfos := getssl(domain)) != 0:
infos.update(sslinfos)
if any(x in request.headers.get('User-Agent') for x in cli):
return (to_yaml(infos))
else:
title = get_name(request.headers.get("host"))
return (render_template('template.html',
hostname=request.headers.get("host"),
title=title,
ip=request.headers.get('X-Client-Ip'),
infos=infos,
color=random.choice(colors)))
@app.route('/<ip>/json')
def getinfojson(ip):
global queries
queries += 1
if any(c.isalpha() for c in ip) and '.' in ip:
domain = ip
ip = getip(ip)
if ip == 0:
return (f"Host unavailable\n")
if ip in private:
return (f"{ip} is a private IP\n")
if (infos := getgeo(ip)) == 0:
infos = {"Error": f"No location data about {ip}"}
if 'domain' in locals():
infos.update({"domain": domain})
if (sslinfos := getssl(domain)) != 0:
infos.update(sslinfos)
return (infos)
@app.route('/')
def self():
global queries
queries += 1
ip = getclientip(request)
if any(x in request.headers.get('User-Agent') for x in cli):
return (str(ip) + '\n')
head = gethead(request, ip)
if (infos := getgeo(ip)) == 0:
infos = {"Error": f"No location data about {ip}"}
title = get_name(request.headers.get("host"))
return (render_template('template.html',
hostname=request.headers.get("host"),
title=title,
ip=ip,
head=head,
infos=infos,
color=random.choice(colors)))
@app.route('/favicon.ico')
def favicon():
return ("Not found"), 404