165 lines
		
	
	
		
			No EOL
		
	
	
		
			7 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable file
		
	
	
	
	
			
		
		
	
	
			165 lines
		
	
	
		
			No EOL
		
	
	
		
			7 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable file
		
	
	
	
	
| #!/usr/bin/env python3
 | |
| 
 | |
| import os
 | |
| import datetime
 | |
| import numpy as np
 | |
| import matplotlib.pyplot as plt
 | |
| import soundfile as sf
 | |
| from scipy.fft import rfft, rfftfreq
 | |
| import shutil
 | |
| import traceback
 | |
| 
 | |
| 
 | |
| RECORDINGS_DIR = "recordings"
 | |
| PROCESSED_RECORDINGS_DIR = "recordings/processed"
 | |
| DETECTIONS_DIR = "events"
 | |
| 
 | |
| DETECT_FREQUENCY = 211 # Hz
 | |
| DETECT_FREQUENCY_TOLERANCE = 2 # Hz
 | |
| ADJACENCY_FACTOR = 2 # area to look for the frequency (e.g. 2 means 100Hz to 400Hz for 200Hz detection)
 | |
| BLOCK_SECONDS = 3 # seconds (longer means more frequency resolution, but less time resolution)
 | |
| DETECTION_DISTANCE_SECONDS = 30 # seconds (minimum time between detections)
 | |
| BLOCK_OVERLAP_FACTOR = 0.9 # overlap between blocks (0.2 means 20% overlap)
 | |
| MIN_SIGNAL_QUALITY = 1000.0 # maximum noise level (relative DB) to consider a detection valid
 | |
| PLOT_PADDING_START_SECONDS = 2 # seconds (padding before and after the event in the plot)
 | |
| PLOT_PADDING_END_SECONDS = 3 # seconds (padding before and after the event in the plot)
 | |
| 
 | |
| DETECTION_DISTANCE_BLOCKS = DETECTION_DISTANCE_SECONDS // BLOCK_SECONDS # number of blocks to skip after a detection
 | |
| DETECT_FREQUENCY_FROM = DETECT_FREQUENCY - DETECT_FREQUENCY_TOLERANCE # Hz
 | |
| DETECT_FREQUENCY_TO = DETECT_FREQUENCY + DETECT_FREQUENCY_TOLERANCE # Hz
 | |
| 
 | |
| 
 | |
| def process_recording(filename):
 | |
|     print('processing', filename)
 | |
| 
 | |
|     # get ISO 8601 nanosecond recording date from filename
 | |
|     date_string_from_filename = os.path.splitext(filename)[0]
 | |
|     recording_date = datetime.datetime.strptime(date_string_from_filename, "%Y-%m-%d_%H-%M-%S.%f%z")
 | |
| 
 | |
|     # get data and metadata from recording
 | |
|     path = os.path.join(RECORDINGS_DIR, filename)
 | |
|     soundfile = sf.SoundFile(path)
 | |
|     samplerate = soundfile.samplerate
 | |
|     samples_per_block = int(BLOCK_SECONDS * samplerate)
 | |
|     overlapping_samples = int(samples_per_block * BLOCK_OVERLAP_FACTOR)
 | |
| 
 | |
|     sample_num = 0
 | |
|     current_event = None
 | |
| 
 | |
|     while sample_num < len(soundfile):
 | |
|         soundfile.seek(sample_num)
 | |
|         block = soundfile.read(frames=samples_per_block, dtype='float32', always_2d=False)
 | |
| 
 | |
|         if len(block) == 0:
 | |
|             break
 | |
| 
 | |
|         # calculate FFT
 | |
|         labels = rfftfreq(len(block), d=1/samplerate)
 | |
|         complex_amplitudes = rfft(block)
 | |
|         amplitudes = np.abs(complex_amplitudes)
 | |
| 
 | |
|         # get the frequency with the highest amplitude within the search range
 | |
|         search_amplitudes = amplitudes[(labels >= DETECT_FREQUENCY_FROM/ADJACENCY_FACTOR) & (labels <= DETECT_FREQUENCY_TO*ADJACENCY_FACTOR)]
 | |
|         search_labels = labels[(labels >= DETECT_FREQUENCY_FROM/ADJACENCY_FACTOR) & (labels <= DETECT_FREQUENCY_TO*ADJACENCY_FACTOR)]
 | |
|         max_amplitude = max(search_amplitudes)
 | |
|         max_amplitude_index = np.argmax(search_amplitudes)
 | |
|         max_freq = search_labels[max_amplitude_index]
 | |
|         max_freq_detected = DETECT_FREQUENCY_FROM <= max_freq <= DETECT_FREQUENCY_TO
 | |
| 
 | |
|         # calculate signal quality
 | |
|         adjacent_amplitudes = amplitudes[(labels < DETECT_FREQUENCY_FROM) | (labels > DETECT_FREQUENCY_TO)]
 | |
|         signal_quality = max_amplitude/np.mean(adjacent_amplitudes)
 | |
|         good_signal_quality = signal_quality > MIN_SIGNAL_QUALITY
 | |
| 
 | |
|         # conclude detection
 | |
|         if (
 | |
|             max_freq_detected and
 | |
|             good_signal_quality
 | |
|         ):
 | |
|             block_date = recording_date + datetime.timedelta(seconds=sample_num / samplerate)
 | |
| 
 | |
|             # detecting an event
 | |
|             if not current_event:
 | |
|                 current_event = {
 | |
|                     'start_at': block_date,
 | |
|                     'end_at': block_date,
 | |
|                     'start_sample': sample_num,
 | |
|                     'end_sample': sample_num + samples_per_block,
 | |
|                     'start_freq': max_freq,
 | |
|                     'end_freq': max_freq,
 | |
|                     'max_amplitude': max_amplitude,
 | |
|                 }
 | |
|             else:
 | |
|                 current_event.update({
 | |
|                     'end_at': block_date,
 | |
|                     'end_freq': max_freq,
 | |
|                     'end_sample': sample_num + samples_per_block,
 | |
|                     'max_amplitude': max(max_amplitude, current_event['max_amplitude']),
 | |
|                 })
 | |
|             print(f'- {block_date.strftime('%Y-%m-%d %H:%M:%S')}: {max_amplitude:.1f}rDB @ {max_freq:.1f}Hz (signal {signal_quality:.3f}x)')
 | |
|         else:
 | |
|             # not detecting an event
 | |
|             if current_event:
 | |
|                 duration = (current_event['end_at'] - current_event['start_at']).total_seconds()
 | |
|                 current_event['duration'] = duration
 | |
|                 print(f'🔊 {current_event['start_at'].strftime('%Y-%m-%d %H:%M:%S')} ({duration:.1f}s): {current_event['start_freq']:.1f}Hz->{current_event['end_freq']:.1f}Hz @{current_event['max_amplitude']:.0f}rDB')
 | |
| 
 | |
|                 # read full audio clip again for writing
 | |
|                 write_event(current_event=current_event, soundfile=soundfile, samplerate=samplerate)
 | |
| 
 | |
|                 current_event = None
 | |
|                 sample_num += DETECTION_DISTANCE_BLOCKS * samples_per_block
 | |
| 
 | |
|         sample_num += samples_per_block - overlapping_samples
 | |
| 
 | |
|     # move to PROCESSED_RECORDINGS_DIR
 | |
| 
 | |
|     os.makedirs(PROCESSED_RECORDINGS_DIR, exist_ok=True)
 | |
|     shutil.move(os.path.join(RECORDINGS_DIR, filename), os.path.join(PROCESSED_RECORDINGS_DIR, filename))
 | |
| 
 | |
| 
 | |
| # write a spectrogram using the sound from start to end of the event
 | |
| def write_event(current_event, soundfile, samplerate):
 | |
|     # date and filename
 | |
|     event_date = current_event['start_at'] - datetime.timedelta(seconds=PLOT_PADDING_START_SECONDS)
 | |
|     filename_prefix = event_date.strftime('%Y-%m-%d_%H-%M-%S.%f%z')
 | |
| 
 | |
|     # event clip
 | |
|     event_start_sample = current_event['start_sample'] - samplerate * PLOT_PADDING_START_SECONDS
 | |
|     event_end_sample = current_event['end_sample'] + samplerate * PLOT_PADDING_END_SECONDS
 | |
|     total_samples = event_end_sample - event_start_sample
 | |
|     soundfile.seek(event_start_sample)
 | |
|     event_clip = soundfile.read(frames=total_samples, dtype='float32', always_2d=False)
 | |
| 
 | |
|     # write flac
 | |
|     flac_path = os.path.join(DETECTIONS_DIR, f"{filename_prefix}.flac")
 | |
|     sf.write(flac_path, event_clip, samplerate, format='FLAC')
 | |
| 
 | |
|     # write spectrogram
 | |
|     plt.figure(figsize=(8, 6))
 | |
|     plt.specgram(event_clip, Fs=samplerate, NFFT=samplerate, noverlap=samplerate//2, cmap='inferno', vmin=-100, vmax=-10)
 | |
|     plt.title(f"Bootshorn @{event_date.strftime('%Y-%m-%d %H:%M:%S%z')}")
 | |
|     plt.xlabel(f"Time {current_event['duration']:.1f}s")
 | |
|     plt.ylabel(f"Frequency {current_event['start_freq']:.1f}Hz -> {current_event['end_freq']:.1f}Hz")
 | |
|     plt.colorbar(label="Intensity (rDB)")
 | |
|     plt.ylim(50, 1000)
 | |
|     plt.savefig(os.path.join(DETECTIONS_DIR, f"{filename_prefix}.png"))
 | |
|     plt.close()
 | |
| 
 | |
| 
 | |
| def main():
 | |
|     os.makedirs(RECORDINGS_DIR, exist_ok=True)
 | |
|     os.makedirs(PROCESSED_RECORDINGS_DIR, exist_ok=True)
 | |
| 
 | |
|     for filename in sorted(os.listdir(RECORDINGS_DIR)):
 | |
|         if filename.endswith(".flac"):
 | |
|             try:
 | |
|                 process_recording(filename)
 | |
|             except Exception as e:
 | |
|                 print(f"Error processing {filename}: {e}")
 | |
|                 # print stacktrace
 | |
|                 traceback.print_exc()
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     main() |