135 lines
No EOL
5.1 KiB
Python
Executable file
135 lines
No EOL
5.1 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
import os
|
|
import concurrent.futures
|
|
import datetime
|
|
import numpy as np
|
|
import matplotlib.pyplot as plt
|
|
import soundfile
|
|
import scipy.signal
|
|
from scipy.fft import rfft, rfftfreq
|
|
import shutil
|
|
|
|
RECORDINGS_DIR = "recordings"
|
|
PROCESSED_RECORDINGS_DIR = "recordings/processed"
|
|
DETECTIONS_DIR = "events"
|
|
|
|
DETECT_FREQUENCY = 211 # Hz
|
|
DETECT_FREQUENCY_TOLERANCE = 2 # Hz
|
|
DETECT_FREQUENCY_FROM = DETECT_FREQUENCY - DETECT_FREQUENCY_TOLERANCE # Hz
|
|
DETECT_FREQUENCY_TO = DETECT_FREQUENCY + DETECT_FREQUENCY_TOLERANCE # Hz
|
|
ADJACENCY_FACTOR = 2 # area to look for noise around the target frequency
|
|
AMPLITUDE_THRESHOLD = 200 # relative DB (rDB) (because not calibrated)
|
|
BLOCK_SECONDS = 3 # seconds (longer means more frequency resolution, but less time resolution)
|
|
DETECTION_DISTANCE = 30 # seconds (minimum time between detections)
|
|
BLOCK_OVERLAP_FACTOR = 0.8 # overlap between blocks (0.8 means 80% overlap)
|
|
|
|
|
|
def process_recording(filename):
|
|
print('processing', filename)
|
|
|
|
# get ISO 8601 nanosecond recording date from filename
|
|
date_string_from_filename = os.path.splitext(filename)[0]
|
|
recording_date = datetime.datetime.strptime(date_string_from_filename, "%Y-%m-%d_%H-%M-%S.%f%z")
|
|
|
|
# get data and metadata from recording
|
|
path = os.path.join(RECORDINGS_DIR, filename)
|
|
sound, samplerate = soundfile.read(path)
|
|
samples_per_block = int(BLOCK_SECONDS * samplerate)
|
|
overlapping_samples = int(samples_per_block * BLOCK_OVERLAP_FACTOR)
|
|
|
|
# calculate a base amplitude for normalization
|
|
complex_amplitudes_global = rfft(sound)
|
|
base_amplitude = np.mean(np.abs(complex_amplitudes_global))
|
|
print(f'base amplitude: {base_amplitude:.5f}rDB')
|
|
|
|
# chache data about current event
|
|
current_event = None
|
|
|
|
# read blocks of audio data with overlap from sound variable
|
|
block_num = 0
|
|
while block_num < (len(sound) // (samples_per_block - overlapping_samples)):
|
|
# get block of audio data
|
|
start_sample = block_num * (samples_per_block - overlapping_samples)
|
|
end_sample = start_sample + samples_per_block
|
|
block = sound[start_sample:end_sample]
|
|
|
|
# get block date and calculate FFT
|
|
block_date = recording_date + datetime.timedelta(seconds=block_num * (samples_per_block - overlapping_samples) / samplerate)
|
|
labels = rfftfreq(len(block), d=1/samplerate)
|
|
complex_amplitudes = rfft(block)
|
|
absolute_amplitudes = np.abs(complex_amplitudes)
|
|
amplitudes = absolute_amplitudes / base_amplitude
|
|
|
|
# get amplitudes only between 100 and 1000 Hz
|
|
adjacent_amplitudes = amplitudes[(labels >= DETECT_FREQUENCY_FROM/ADJACENCY_FACTOR) & (labels <= DETECT_FREQUENCY_TO*ADJACENCY_FACTOR)]
|
|
adjacent_labels = labels[(labels >= DETECT_FREQUENCY_FROM/ADJACENCY_FACTOR) & (labels <= DETECT_FREQUENCY_TO*ADJACENCY_FACTOR)]
|
|
|
|
# get the frequency with the highest amplitude
|
|
max_amplitude = max(adjacent_amplitudes)
|
|
max_amplitude_index = np.argmax(adjacent_amplitudes)
|
|
max_freq = adjacent_labels[max_amplitude_index]
|
|
|
|
# get the average amplitude of the adjacent frequencies
|
|
noise = np.mean(adjacent_amplitudes)/max_amplitude
|
|
|
|
# check for detection criteria
|
|
max_freq_detected = DETECT_FREQUENCY_FROM <= max_freq <= DETECT_FREQUENCY_TO
|
|
amplitude_detected = max_amplitude > AMPLITUDE_THRESHOLD
|
|
low_noise_detected = noise < 0.1
|
|
|
|
# conclude detection
|
|
if (
|
|
max_freq_detected and
|
|
amplitude_detected and
|
|
low_noise_detected
|
|
):
|
|
# detecting an event
|
|
if not current_event:
|
|
current_event = {
|
|
'start_at': block_date,
|
|
'end_at': block_date,
|
|
'start_freq': max_freq,
|
|
'end_freq': max_freq,
|
|
'max_amplitude': max_amplitude,
|
|
}
|
|
else:
|
|
current_event.update({
|
|
'end_at': block_date,
|
|
'end_freq': max_freq,
|
|
'max_amplitude': max(max_amplitude, current_event['max_amplitude']),
|
|
})
|
|
#print(f'{block_date}: {max_amplitude:.1f}rDB @ {max_freq:.1f}Hz (noise {noise:.3f}rDB)')
|
|
else:
|
|
# not detecting an event
|
|
if current_event:
|
|
duration = (current_event['end_at'] - current_event['start_at']).total_seconds()
|
|
print(f'🔊 {current_event['start_at'].strftime('%Y-%m-%d %H:%M:%S')} ({duration:.1f}s): {current_event['start_freq']:.1f}Hz->{current_event['end_freq']:.1f}Hz @{current_event['max_amplitude']:.0f}rDB')
|
|
|
|
write_clip()
|
|
write_plot()
|
|
|
|
current_event = None
|
|
block_num += DETECTION_DISTANCE // BLOCK_SECONDS
|
|
|
|
block_num += 1
|
|
|
|
|
|
def write_clip():
|
|
pass
|
|
|
|
|
|
def write_plot():
|
|
pass
|
|
|
|
|
|
def main():
|
|
os.makedirs(RECORDINGS_DIR, exist_ok=True)
|
|
os.makedirs(PROCESSED_RECORDINGS_DIR, exist_ok=True)
|
|
|
|
for filename in os.listdir(RECORDINGS_DIR):
|
|
if filename.endswith(".flac"):
|
|
process_recording(filename)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |