#!/usr/bin/env python3 import os import datetime import numpy as np import matplotlib.pyplot as plt import soundfile as sf from scipy.fft import rfft, rfftfreq import shutil import traceback RECORDINGS_DIR = "recordings" PROCESSED_RECORDINGS_DIR = "recordings/processed" DETECTIONS_DIR = "events" DETECT_FREQUENCY = 211 # Hz DETECT_FREQUENCY_TOLERANCE = 2 # Hz ADJACENCY_FACTOR = 2 # area to look for the frequency (e.g. 2 means 100Hz to 400Hz for 200Hz detection) BLOCK_SECONDS = 3 # seconds (longer means more frequency resolution, but less time resolution) DETECTION_DISTANCE_SECONDS = 30 # seconds (minimum time between detections) BLOCK_OVERLAP_FACTOR = 0.9 # overlap between blocks (0.2 means 20% overlap) MIN_SIGNAL_QUALITY = 1000.0 # maximum noise level (relative DB) to consider a detection valid PLOT_PADDING_START_SECONDS = 2 # seconds (padding before and after the event in the plot) PLOT_PADDING_END_SECONDS = 3 # seconds (padding before and after the event in the plot) DETECTION_DISTANCE_BLOCKS = DETECTION_DISTANCE_SECONDS // BLOCK_SECONDS # number of blocks to skip after a detection DETECT_FREQUENCY_FROM = DETECT_FREQUENCY - DETECT_FREQUENCY_TOLERANCE # Hz DETECT_FREQUENCY_TO = DETECT_FREQUENCY + DETECT_FREQUENCY_TOLERANCE # Hz def process_recording(filename): print('processing', filename) # get ISO 8601 nanosecond recording date from filename date_string_from_filename = os.path.splitext(filename)[0] recording_date = datetime.datetime.strptime(date_string_from_filename, "%Y-%m-%d_%H-%M-%S.%f%z") # get data and metadata from recording path = os.path.join(RECORDINGS_DIR, filename) soundfile = sf.SoundFile(path) samplerate = soundfile.samplerate samples_per_block = int(BLOCK_SECONDS * samplerate) overlapping_samples = int(samples_per_block * BLOCK_OVERLAP_FACTOR) sample_num = 0 current_event = None while sample_num < len(soundfile): soundfile.seek(sample_num) block = soundfile.read(frames=samples_per_block, dtype='float32', always_2d=False) if len(block) == 0: break # calculate FFT labels = rfftfreq(len(block), d=1/samplerate) complex_amplitudes = rfft(block) amplitudes = np.abs(complex_amplitudes) # get the frequency with the highest amplitude within the search range search_amplitudes = amplitudes[(labels >= DETECT_FREQUENCY_FROM/ADJACENCY_FACTOR) & (labels <= DETECT_FREQUENCY_TO*ADJACENCY_FACTOR)] search_labels = labels[(labels >= DETECT_FREQUENCY_FROM/ADJACENCY_FACTOR) & (labels <= DETECT_FREQUENCY_TO*ADJACENCY_FACTOR)] max_amplitude = max(search_amplitudes) max_amplitude_index = np.argmax(search_amplitudes) max_freq = search_labels[max_amplitude_index] max_freq_detected = DETECT_FREQUENCY_FROM <= max_freq <= DETECT_FREQUENCY_TO # calculate signal quality adjacent_amplitudes = amplitudes[(labels < DETECT_FREQUENCY_FROM) | (labels > DETECT_FREQUENCY_TO)] signal_quality = max_amplitude/np.mean(adjacent_amplitudes) good_signal_quality = signal_quality > MIN_SIGNAL_QUALITY # conclude detection if ( max_freq_detected and good_signal_quality ): block_date = recording_date + datetime.timedelta(seconds=sample_num / samplerate) # detecting an event if not current_event: current_event = { 'start_at': block_date, 'end_at': block_date, 'start_sample': sample_num, 'end_sample': sample_num + samples_per_block, 'start_freq': max_freq, 'end_freq': max_freq, 'max_amplitude': max_amplitude, } else: current_event.update({ 'end_at': block_date, 'end_freq': max_freq, 'end_sample': sample_num + samples_per_block, 'max_amplitude': max(max_amplitude, current_event['max_amplitude']), }) print(f'- {block_date.strftime('%Y-%m-%d %H:%M:%S')}: {max_amplitude:.1f}rDB @ {max_freq:.1f}Hz (signal {signal_quality:.3f}x)') else: # not detecting an event if current_event: duration = (current_event['end_at'] - current_event['start_at']).total_seconds() current_event['duration'] = duration print(f'🔊 {current_event['start_at'].strftime('%Y-%m-%d %H:%M:%S')} ({duration:.1f}s): {current_event['start_freq']:.1f}Hz->{current_event['end_freq']:.1f}Hz @{current_event['max_amplitude']:.0f}rDB') # read full audio clip again for writing write_event(current_event=current_event, soundfile=soundfile, samplerate=samplerate) current_event = None sample_num += DETECTION_DISTANCE_BLOCKS * samples_per_block sample_num += samples_per_block - overlapping_samples # write a spectrogram using the sound from start to end of the event def write_event(current_event, soundfile, samplerate): # date and filename event_date = current_event['start_at'] - datetime.timedelta(seconds=PLOT_PADDING_START_SECONDS) filename_prefix = event_date.strftime('%Y-%m-%d_%H-%M-%S.%f%z') # event clip event_start_sample = current_event['start_sample'] - samplerate * PLOT_PADDING_START_SECONDS event_end_sample = current_event['end_sample'] + samplerate * PLOT_PADDING_END_SECONDS total_samples = event_end_sample - event_start_sample soundfile.seek(event_start_sample) event_clip = soundfile.read(frames=total_samples, dtype='float32', always_2d=False) # write flac flac_path = os.path.join(DETECTIONS_DIR, f"{filename_prefix}.flac") sf.write(flac_path, event_clip, samplerate, format='FLAC') # write spectrogram plt.figure(figsize=(8, 6)) plt.specgram(event_clip, Fs=samplerate, NFFT=samplerate, noverlap=samplerate//2, cmap='inferno', vmin=-100, vmax=-10) plt.title(f"Bootshorn @{event_date.strftime('%Y-%m-%d %H:%M:%S%z')}") plt.xlabel(f"Time {current_event['duration']:.1f}s") plt.ylabel(f"Frequency {current_event['start_freq']:.1f}Hz -> {current_event['end_freq']:.1f}Hz") plt.colorbar(label="Intensity (rDB)") plt.ylim(50, 1000) plt.savefig(os.path.join(DETECTIONS_DIR, f"{filename_prefix}.png")) plt.close() def main(): os.makedirs(RECORDINGS_DIR, exist_ok=True) os.makedirs(PROCESSED_RECORDINGS_DIR, exist_ok=True) for filename in sorted(os.listdir(RECORDINGS_DIR)): if filename.endswith(".flac"): try: process_recording(filename) except Exception as e: print(f"Error processing {filename}: {e}") # print stacktrace traceback.print_exc() if __name__ == "__main__": main()