ja3/python/ja3s.py
Patrick Kelley c3f8aeaa1f Initial
2025-05-07 14:11:26 -04:00

198 lines
6.2 KiB
Python

#!/usr/bin/env python
"""Generate JA3 fingerprints from PCAPs using Python."""
import argparse
import dpkt
import json
import socket
import struct
import os
from hashlib import md5
from distutils.version import LooseVersion
__author__ = "Tommy Stallings"
__copyright__ = "Copyright (c) 2017, salesforce.com, inc."
__credits__ = ["John B. Althouse", "Jeff Atkinson", "Josh Atkins"]
__license__ = "BSD 3-Clause License"
__version__ = "1.0.1"
__maintainer__ = "Tommy Stallings, Brandon Dixon"
__email__ = "tommy.stallings2@gmail.com"
SSL_PORT = 443
TLS_HANDSHAKE = 22
def convert_ip(value):
"""Convert an IP address from binary to text.
:param value: Raw binary data to convert
:type value: str
:returns: str
"""
try:
return socket.inet_ntop(socket.AF_INET, value)
except ValueError:
return socket.inet_ntop(socket.AF_INET6, value)
def process_extensions(server_handshake):
"""Process any extra extensions and convert to a JA3 segment.
:param client_handshake: Handshake data from the packet
:type client_handshake: dpkt.ssl.TLSClientHello
:returns: list
"""
if not hasattr(server_handshake, "extensions"):
# Needed to preserve commas on the join
return [""]
exts = list()
for ext_val, ext_data in server_handshake.extensions:
exts.append(ext_val)
results = list()
results.append("-".join([str(x) for x in exts]))
return results
def process_pcap(pcap, any_port=False):
"""Process packets within the PCAP.
:param pcap: Opened PCAP file to be processed
:type pcap: dpkt.pcap.Reader
:param any_port: Whether or not to search for non-SSL ports
:type any_port: bool
"""
decoder = dpkt.ethernet.Ethernet
linktype = pcap.datalink()
if linktype == dpkt.pcap.DLT_LINUX_SLL:
decoder = dpkt.sll.SLL
elif linktype == dpkt.pcap.DLT_NULL or linktype == dpkt.pcap.DLT_LOOP:
decoder = dpkt.loopback.Loopback
results = list()
for timestamp, buf in pcap:
try:
eth = decoder(buf)
except Exception:
continue
if not isinstance(eth.data, (dpkt.ip.IP, dpkt.ip6.IP6)):
# We want an IP packet
continue
if not isinstance(eth.data.data, dpkt.tcp.TCP):
# TCP only
continue
ip = eth.data
tcp = ip.data
if not (tcp.dport == SSL_PORT or tcp.sport == SSL_PORT or any_port):
# Doesn't match SSL port or we are picky
continue
if len(tcp.data) <= 0:
continue
tls_handshake = bytearray(tcp.data)
if tls_handshake[0] != TLS_HANDSHAKE:
continue
records = list()
try:
records, bytes_used = dpkt.ssl.tls_multi_factory(tcp.data)
except dpkt.ssl.SSL3Exception:
continue
except dpkt.dpkt.NeedData:
continue
if len(records) <= 0:
continue
for record in records:
if record.type != TLS_HANDSHAKE:
continue
if len(record.data) == 0:
continue
server_hello = bytearray(record.data)
if server_hello[0] != 2:
# We only want server HELLO
continue
try:
handshake = dpkt.ssl.TLSHandshake(record.data)
except dpkt.dpkt.NeedData:
# Looking for a handshake here
continue
if not isinstance(handshake.data, dpkt.ssl.TLSServerHello):
# Still not the HELLO
continue
server_handshake = handshake.data
ja3 = [str(server_handshake.version)]
# Cipher Suites (16 bit values)
if LooseVersion(dpkt.__version__) <= LooseVersion('1.9.1'):
ja3.append(str(server_handshake.cipher_suite))
else:
ja3.append(str(server_handshake.ciphersuite.code))
ja3 += process_extensions(server_handshake)
ja3 = ",".join(ja3)
record = {"source_ip": convert_ip(ip.src),
"destination_ip": convert_ip(ip.dst),
"source_port": tcp.sport,
"destination_port": tcp.dport,
"ja3": ja3,
"ja3_digest": md5(ja3.encode()).hexdigest(),
"timestamp": timestamp}
results.append(record)
return results
def main():
"""Intake arguments from the user and print out JA3 output."""
desc = "A python script for extracting JA3 fingerprints from PCAP files"
parser = argparse.ArgumentParser(description=(desc))
parser.add_argument("pcap", help="The pcap file to process")
help_text = "Look for client hellos on any port instead of just 443"
parser.add_argument("-a", "--any_port", required=False,
action="store_true", default=False,
help=help_text)
help_text = "Print out as JSON records for downstream parsing"
parser.add_argument("-j", "--json", required=False, action="store_true",
default=False, help=help_text)
args = parser.parse_args()
# Use an iterator to process each line of the file
output = None
with open(args.pcap, 'rb') as fp:
try:
capture = dpkt.pcap.Reader(fp)
except ValueError as e_pcap:
try:
fp.seek(0, os.SEEK_SET)
capture = dpkt.pcapng.Reader(fp)
except ValueError as e_pcapng:
raise Exception(
"File doesn't appear to be a PCAP or PCAPng: %s, %s" %
(e_pcap, e_pcapng))
output = process_pcap(capture, any_port=args.any_port)
if args.json:
output = json.dumps(output, indent=4, sort_keys=True)
print(output)
else:
for record in output:
tmp = '[{dest}:{port}] JA3S: {segment} --> {digest}'
tmp = tmp.format(dest=record['destination_ip'],
port=record['destination_port'],
segment=record['ja3'],
digest=record['ja3_digest'])
print(tmp)
if __name__ == "__main__":
main()