#!/usr/bin/env python3 import os import concurrent.futures import datetime import numpy as np import matplotlib.pyplot as plt import soundfile import scipy.signal from scipy.fft import rfft, rfftfreq import shutil RECORDINGS_DIR = "recordings" PROCESSED_RECORDINGS_DIR = "recordings/processed" DETECTIONS_DIR = "events" DETECT_FREQUENCY = 211 # Hz DETECT_FREQUENCY_TOLERANCE = 2 # Hz DETECT_FREQUENCY_FROM = DETECT_FREQUENCY - DETECT_FREQUENCY_TOLERANCE # Hz DETECT_FREQUENCY_TO = DETECT_FREQUENCY + DETECT_FREQUENCY_TOLERANCE # Hz ADJACENCY_FACTOR = 2 # area to look for noise around the target frequency AMPLITUDE_THRESHOLD = 200 # relative DB (rDB) (because not calibrated) BLOCK_SECONDS = 3 # seconds (longer means more frequency resolution, but less time resolution) DETECTION_DISTANCE = 30 # seconds (minimum time between detections) BLOCK_OVERLAP_FACTOR = 0.8 # overlap between blocks (0.8 means 80% overlap) def process_recording(filename): print('processing', filename) # get ISO 8601 nanosecond recording date from filename date_string_from_filename = os.path.splitext(filename)[0] recording_date = datetime.datetime.strptime(date_string_from_filename, "%Y-%m-%d_%H-%M-%S.%f%z") # get data and metadata from recording path = os.path.join(RECORDINGS_DIR, filename) samplerate = soundfile.info(path).samplerate samples_per_block = int(BLOCK_SECONDS * samplerate) overlapping_samples = int(samples_per_block * BLOCK_OVERLAP_FACTOR) # chache data about current event current_event = None # read blocks of audio data with overlap from sound variable block_num = 0 for block in soundfile.blocks(path, blocksize=samples_per_block, overlap=overlapping_samples): block_num += 1 # get block date and calculate FFT block_date = recording_date + datetime.timedelta(seconds=block_num * (samples_per_block - overlapping_samples) / samplerate) labels = rfftfreq(len(block), d=1/samplerate) complex_amplitudes = rfft(block) amplitudes = np.abs(complex_amplitudes) # get amplitudes only between 100 and 1000 Hz search_amplitudes = amplitudes[(labels >= DETECT_FREQUENCY_FROM/ADJACENCY_FACTOR) & (labels <= DETECT_FREQUENCY_TO*ADJACENCY_FACTOR)] search_labels = labels[(labels >= DETECT_FREQUENCY_FROM/ADJACENCY_FACTOR) & (labels <= DETECT_FREQUENCY_TO*ADJACENCY_FACTOR)] # get the frequency with the highest amplitude max_amplitude = max(search_amplitudes) max_amplitude_index = np.argmax(search_amplitudes) max_freq = search_labels[max_amplitude_index] # get the average amplitude of the search frequencies adjacent_amplitudes = search_amplitudes[(search_labels < DETECT_FREQUENCY_FROM) | (search_labels > DETECT_FREQUENCY_TO)] noise = np.mean(adjacent_amplitudes)/max_amplitude # check for detection criteria max_freq_detected = DETECT_FREQUENCY_FROM <= max_freq <= DETECT_FREQUENCY_TO amplitude_detected = max_amplitude > AMPLITUDE_THRESHOLD low_noise_detected = noise < 0.1 # conclude detection if ( max_freq_detected and amplitude_detected and low_noise_detected ): # detecting an event if not current_event: current_event = { 'start_at': block_date, 'end_at': block_date, 'start_freq': max_freq, 'end_freq': max_freq, 'max_amplitude': max_amplitude, } else: current_event.update({ 'end_at': block_date, 'end_freq': max_freq, 'max_amplitude': max(max_amplitude, current_event['max_amplitude']), }) print(f'- {block_date.strftime('%Y-%m-%d %H:%M:%S')}: {max_amplitude:.1f}rDB @ {max_freq:.1f}Hz (noise {noise:.3f}rDB)') else: # not detecting an event if current_event: duration = (current_event['end_at'] - current_event['start_at']).total_seconds() print(f'🔊 {current_event['start_at'].strftime('%Y-%m-%d %H:%M:%S')} ({duration:.1f}s): {current_event['start_freq']:.1f}Hz->{current_event['end_freq']:.1f}Hz @{current_event['max_amplitude']:.0f}rDB') write_clip() write_plot() current_event = None block_num += (DETECTION_DISTANCE // BLOCK_SECONDS) * samples_per_block block_num += 1 def write_clip(): pass def write_plot(): pass def main(): os.makedirs(RECORDINGS_DIR, exist_ok=True) os.makedirs(PROCESSED_RECORDINGS_DIR, exist_ok=True) for filename in sorted(os.listdir(RECORDINGS_DIR)): if filename.endswith(".flac"): process_recording(filename) if __name__ == "__main__": main()