bootshorn_investigation/process
2025-06-01 18:33:56 +02:00

124 lines
No EOL
4.8 KiB
Python
Executable file

#!/usr/bin/env python3
import os
import concurrent.futures
import datetime
import numpy as np
import matplotlib.pyplot as plt
import soundfile
import scipy.signal
from scipy.fft import rfft, rfftfreq
import shutil
RECORDINGS_DIR = "recordings"
PROCESSED_RECORDINGS_DIR = "recordings/processed"
DETECTIONS_DIR = "events"
DETECT_FREQUENCY = 211 # Hz
DETECT_FREQUENCY_TOLERANCE = 2 # Hz
DETECT_FREQUENCY_FROM = DETECT_FREQUENCY - DETECT_FREQUENCY_TOLERANCE # Hz
DETECT_FREQUENCY_TO = DETECT_FREQUENCY + DETECT_FREQUENCY_TOLERANCE # Hz
ADJACENCY_FACTOR = 2 # area to look for noise around the target frequency
BLOCK_SECONDS = 3 # seconds (longer means more frequency resolution, but less time resolution)
DETECTION_DISTANCE = 30 # seconds (minimum time between detections)
BLOCK_OVERLAP_FACTOR = 0.9 # overlap between blocks (0.2 means 20% overlap)
MIN_SIGNAL_QUALITY = 1000.0 # maximum noise level (relative DB) to consider a detection valid
def process_recording(filename):
print('processing', filename)
# get ISO 8601 nanosecond recording date from filename
date_string_from_filename = os.path.splitext(filename)[0]
recording_date = datetime.datetime.strptime(date_string_from_filename, "%Y-%m-%d_%H-%M-%S.%f%z")
# get data and metadata from recording
path = os.path.join(RECORDINGS_DIR, filename)
samplerate = soundfile.info(path).samplerate
samples_per_block = int(BLOCK_SECONDS * samplerate)
overlapping_samples = int(samples_per_block * BLOCK_OVERLAP_FACTOR)
# chache data about current event
current_event = None
# read blocks of audio data with overlap from sound variable
sample_num = 0
for block in soundfile.blocks(path, blocksize=samples_per_block, overlap=overlapping_samples):
sample_num += samples_per_block - overlapping_samples
# get block date and calculate FFT
block_date = recording_date + datetime.timedelta(seconds=sample_num / samplerate)
labels = rfftfreq(len(block), d=1/samplerate)
complex_amplitudes = rfft(block)
amplitudes = np.abs(complex_amplitudes)
# get amplitudes only between 100 and 1000 Hz
search_amplitudes = amplitudes[(labels >= DETECT_FREQUENCY_FROM/ADJACENCY_FACTOR) & (labels <= DETECT_FREQUENCY_TO*ADJACENCY_FACTOR)]
search_labels = labels[(labels >= DETECT_FREQUENCY_FROM/ADJACENCY_FACTOR) & (labels <= DETECT_FREQUENCY_TO*ADJACENCY_FACTOR)]
# get the frequency with the highest amplitude
max_amplitude = max(search_amplitudes)
max_amplitude_index = np.argmax(search_amplitudes)
max_freq = search_labels[max_amplitude_index]
# get the average amplitude of the search frequencies
adjacent_amplitudes = amplitudes[(labels < DETECT_FREQUENCY_FROM) | (labels > DETECT_FREQUENCY_TO)]
signal_quality = max_amplitude/np.mean(adjacent_amplitudes)
# check for detection criteria
max_freq_detected = DETECT_FREQUENCY_FROM <= max_freq <= DETECT_FREQUENCY_TO
good_signal_quality = signal_quality > MIN_SIGNAL_QUALITY
# conclude detection
if (
max_freq_detected and
good_signal_quality
):
# detecting an event
if not current_event:
current_event = {
'start_at': block_date,
'end_at': block_date,
'start_freq': max_freq,
'end_freq': max_freq,
'max_amplitude': max_amplitude,
}
else:
current_event.update({
'end_at': block_date,
'end_freq': max_freq,
'max_amplitude': max(max_amplitude, current_event['max_amplitude']),
})
print(f'- {block_date.strftime('%Y-%m-%d %H:%M:%S')}: {max_amplitude:.1f}rDB @ {max_freq:.1f}Hz (signal {signal_quality:.3f}x)')
else:
# not detecting an event
if current_event:
duration = (current_event['end_at'] - current_event['start_at']).total_seconds()
print(f'🔊 {current_event['start_at'].strftime('%Y-%m-%d %H:%M:%S')} ({duration:.1f}s): {current_event['start_freq']:.1f}Hz->{current_event['end_freq']:.1f}Hz @{current_event['max_amplitude']:.0f}rDB')
write_clip()
write_plot()
current_event = None
#block_num += (DETECTION_DISTANCE // BLOCK_SECONDS) * samples_per_block
#block_num += 1
def write_clip():
pass
def write_plot():
pass
def main():
os.makedirs(RECORDINGS_DIR, exist_ok=True)
os.makedirs(PROCESSED_RECORDINGS_DIR, exist_ok=True)
for filename in sorted(os.listdir(RECORDINGS_DIR)):
if filename.endswith(".flac"):
process_recording(filename)
if __name__ == "__main__":
main()