bootshorn_investigation/process
2025-06-01 14:14:00 +02:00

106 lines
No EOL
4.1 KiB
Python
Executable file

#!/usr/bin/env python3
import os
import concurrent.futures
import datetime
import numpy as np
import matplotlib.pyplot as plt
import soundfile
import scipy.signal
from scipy.fft import rfft, rfftfreq
import shutil
RECORDINGS_DIR = "recordings"
PROCESSED_RECORDINGS_DIR = "recordings/processed"
DETECTIONS_DIR = "events"
DETECT_FREQUENCY = 211 # Hz
DETECT_FREQUENCY_TOLERANCE = 2 # Hz
DETECT_FREQUENCY_FROM = DETECT_FREQUENCY - DETECT_FREQUENCY_TOLERANCE # Hz
DETECT_FREQUENCY_TO = DETECT_FREQUENCY + DETECT_FREQUENCY_TOLERANCE # Hz
ADJACENCY_FACTOR = 2 # area to look for noise around the target frequency
AMPLITUDE_THRESHOLD = 200 # relative DB (rDB) (because not calibrated)
BLOCK_SECONDS = 3 # seconds (longer means more frequency resolution, but less time resolution)
DETECTION_DISTANCE = 30 # seconds (minimum time between detections)
BLOCK_OVERLAP = 0.8 # overlap between blocks (0.8 means 80% overlap)
def process_recording(filename):
print('processing', filename)
# get ISO 8601 nanosecond recording date from filename
date_string_from_filename = os.path.splitext(filename)[0]
recording_date = datetime.datetime.strptime(date_string_from_filename, "%Y-%m-%d_%H-%M-%S.%f%z")
# get data and metadata from recording
path = os.path.join(RECORDINGS_DIR, filename)
sound, samplerate = soundfile.read(path)
samples_per_block = int(BLOCK_SECONDS * samplerate)
overlap = int(samples_per_block * BLOCK_OVERLAP)
# get median amplitude for normalization
complex_amplitudes_global = rfft(sound)
median_amplitude = np.median(np.abs(complex_amplitudes_global))
print(f'median amplitude: {median_amplitude:.5f}rDB')
# initialize to a very old date
last_detection_at = datetime.datetime.min.replace(tzinfo=recording_date.tzinfo)
is_detecting = False
# read blocks of audio data with overlap from sound variable
for num in range(0, len(sound) // (samples_per_block - overlap)):
start = num * (samples_per_block - overlap)
end = start + samples_per_block
block = sound[start:end]
block_date = recording_date + datetime.timedelta(seconds=num * BLOCK_SECONDS)
labels = rfftfreq(len(block), d=1/samplerate)
complex_amplitudes = rfft(block)
absolute_amplitudes = np.abs(complex_amplitudes)
amplitudes = absolute_amplitudes / median_amplitude
# get amplitudes only between 100 and 1000 Hz
adjacent_amplitudes = amplitudes[(labels >= DETECT_FREQUENCY_FROM/ADJACENCY_FACTOR) & (labels <= DETECT_FREQUENCY_TO*ADJACENCY_FACTOR)]
adjacent_labels = labels[(labels >= DETECT_FREQUENCY_FROM/ADJACENCY_FACTOR) & (labels <= DETECT_FREQUENCY_TO*ADJACENCY_FACTOR)]
# get the frequency with the highest amplitude
max_amplitude = max(adjacent_amplitudes)
max_amplitude_index = np.argmax(adjacent_amplitudes)
max_freq = adjacent_labels[max_amplitude_index]
# get the average amplitude of the adjacent frequencies
noise = np.mean(adjacent_amplitudes)/max_amplitude
# check for detection criteria
max_freq_detected = DETECT_FREQUENCY_FROM <= max_freq <= DETECT_FREQUENCY_TO
amplitude_detected = max_amplitude > AMPLITUDE_THRESHOLD
low_noise_detected = noise < 0.1
no_recent_detection = is_detecting or (block_date - last_detection_at).total_seconds() > DETECTION_DISTANCE
# conclude detection
if (
max_freq_detected and
amplitude_detected and
low_noise_detected and
no_recent_detection
):
if not is_detecting:
is_detecting = True
last_detection_at = block_date
print("🔊")
print(f'{block_date}: {max_amplitude:.1f}rDB @ {max_freq:.1f}Hz (noise {noise:.3f}rDB)')
else:
is_detecting = False
def main():
os.makedirs(RECORDINGS_DIR, exist_ok=True)
os.makedirs(PROCESSED_RECORDINGS_DIR, exist_ok=True)
for filename in os.listdir(RECORDINGS_DIR):
if filename.endswith(".flac"):
process_recording(filename)
if __name__ == "__main__":
main()