#!/usr/bin/env python3 import os import concurrent.futures import datetime import numpy as np import matplotlib.pyplot as plt import soundfile import scipy.signal from scipy.fft import rfft, rfftfreq import shutil RECORDINGS_DIR = "recordings" PROCESSED_RECORDINGS_DIR = "recordings/processed" DETECTIONS_DIR = "events" DETECT_FREQUENCY = 211 # Hz DETECT_FREQUENCY_TOLERANCE = 2 # Hz DETECT_FREQUENCY_FROM = DETECT_FREQUENCY - DETECT_FREQUENCY_TOLERANCE # Hz DETECT_FREQUENCY_TO = DETECT_FREQUENCY + DETECT_FREQUENCY_TOLERANCE # Hz ADJACENCY_FACTOR = 2 # area to look for noise around the target frequency AMPLITUDE_THRESHOLD = 200 # relative DB (rDB) (because not calibrated) BLOCK_SECONDS = 3 # seconds (longer means more frequency resolution, but less time resolution) def process_recording(filename): print('processing', filename) # get ISO 8601 nanosecond recording date from filename date_string_from_filename = os.path.splitext(filename)[0] recording_date = datetime.datetime.strptime(date_string_from_filename, "%Y-%m-%d_%H-%M-%S.%f%z") # get samplerate and blocksize path = os.path.join(RECORDINGS_DIR, filename) info = soundfile.info(path) samplerate = info.samplerate block_samples = int(BLOCK_SECONDS * samplerate) # iterate blocks for num, block in enumerate(soundfile.blocks(path, blocksize=block_samples, overlap=int(block_samples*0.8))): block_date = recording_date + datetime.timedelta(seconds=num * BLOCK_SECONDS) complex_amplitudes = rfft(block) amplitudes = np.abs(complex_amplitudes) labels = rfftfreq(len(block), d=1/samplerate) # get amplitudes only between 100 and 1000 Hz adjacent_amplitudes = amplitudes[(labels >= DETECT_FREQUENCY_FROM/ADJACENCY_FACTOR) & (labels <= DETECT_FREQUENCY_TO*ADJACENCY_FACTOR)] adjacent_labels = labels[(labels >= DETECT_FREQUENCY_FROM/ADJACENCY_FACTOR) & (labels <= DETECT_FREQUENCY_TO*ADJACENCY_FACTOR)] # get the frequency with the highest amplitude max_amplitude = max(adjacent_amplitudes) max_amplitude_index = np.argmax(adjacent_amplitudes) max_freq = adjacent_labels[max_amplitude_index] # get the average amplitude of the adjacent frequencies noise = np.mean(adjacent_amplitudes)/max_amplitude # check for detection criteria max_freq_detected = DETECT_FREQUENCY_FROM <= max_freq <= DETECT_FREQUENCY_TO amplitude_detected = max_amplitude > AMPLITUDE_THRESHOLD low_noise_detected = noise < 0.1 # conclude detection if max_freq_detected and amplitude_detected and low_noise_detected: print(f'{block_date}: {max_amplitude:.1f}rDB @ {max_freq:.1f}Hz ({noise:.3f}rDB noise)') def main(): os.makedirs(RECORDINGS_DIR, exist_ok=True) os.makedirs(PROCESSED_RECORDINGS_DIR, exist_ok=True) for filename in os.listdir(RECORDINGS_DIR): if filename.endswith(".flac"): process_recording(filename) if __name__ == "__main__": main()