#!/usr/bin/env python3 import os import concurrent.futures import datetime import numpy as np import matplotlib.pyplot as plt import soundfile import scipy.signal from scipy.fft import rfft, rfftfreq import shutil RECORDINGS_DIR = "recordings" PROCESSED_RECORDINGS_DIR = "recordings/processed" DETECTIONS_DIR = "events" DETECT_FREQUENCY = 211 # Hz DETECT_FREQUENCY_TOLERANCE = 2 # Hz DETECT_FREQUENCY_FROM = DETECT_FREQUENCY - DETECT_FREQUENCY_TOLERANCE # Hz DETECT_FREQUENCY_TO = DETECT_FREQUENCY + DETECT_FREQUENCY_TOLERANCE # Hz ADJACENCY_FACTOR = 2 # area to look for noise around the target frequency AMPLITUDE_THRESHOLD = 200 # rDB def process_recording(filename): print('processing', filename) # get ISO 8601 nanosecond recording date from filename date_string_from_filename = os.path.splitext(filename)[0] recording_date = datetime.datetime.strptime(date_string_from_filename, "%Y-%m-%d_%H-%M-%S.%f%z") # get samplerate and blocksize path = os.path.join(RECORDINGS_DIR, filename) info = soundfile.info(path) samplerate = info.samplerate blocksize = int(CLIP_SECONDS * samplerate) # iterate blocks for num, block in enumerate(soundfile.blocks(path, blocksize=blocksize, overlap=int(blocksize*0.8))): block_date = recording_date + datetime.timedelta(seconds=num * CLIP_SECONDS) complex_amplitudes = rfft(block) amplitudes = np.abs(complex_amplitudes) labels = rfftfreq(len(block), d=1/samplerate) # get amplitudes only between 100 and 1000 Hz adjacent_amplitudes = amplitudes[(labels >= DETECT_FREQUENCY_FROM/ADJACENCY_FACTOR) & (labels <= DETECT_FREQUENCY_TO*ADJACENCY_FACTOR)] adjacent_labels = labels[(labels >= DETECT_FREQUENCY_FROM/ADJACENCY_FACTOR) & (labels <= DETECT_FREQUENCY_TO*ADJACENCY_FACTOR)] # get the frequency with the highest amplitude max_amplitude = max(adjacent_amplitudes) max_amplitude_index = np.argmax(adjacent_amplitudes) max_freq = adjacent_labels[max_amplitude_index] # check for detection criteria max_freq_detected = DETECT_FREQUENCY_FROM <= max_freq <= DETECT_FREQUENCY_TO amplitude_detected = max_amplitude > AMPLITUDE_THRESHOLD # conclude detection if max_freq_detected and amplitude_detected: print(f'{block_date}: {max_amplitude:.2f}rDB @ {max_freq:.2f}Hz') def main(): os.makedirs(RECORDINGS_DIR, exist_ok=True) os.makedirs(PROCESSED_RECORDINGS_DIR, exist_ok=True) for filename in os.listdir(RECORDINGS_DIR): if filename.endswith(".flac"): process_recording(filename) if __name__ == "__main__": main()