bootshorn_investigation/process_chunks.py
2025-05-30 19:39:57 +02:00

100 lines
No EOL
3.5 KiB
Python
Executable file

#!/usr/bin/env python3
import os
import concurrent.futures
import datetime
import numpy as np
import matplotlib.pyplot as plt
import soundfile as sf
import scipy.signal
from scipy.fft import fft, fftfreq
from datetime import datetime
import shutil
INPUT_DIR = "chunks"
OUTPUT_DIR = "chunks/processed"
CHUNK_SECONDS = 1
TOLERANCE = 1
OVERTONE_TOLERANCE = TOLERANCE * 2
THRESHOLD_BASE = 0.5
THRESHOLD_OCT = THRESHOLD_BASE / 10
CLIP_PADDING_BEFORE = 1
CLIP_PADDING_AFTER = 6
TARGET_FREQ = 211
OVERTONE_FREQ = TARGET_FREQ * 2
NFFT = 32768
SKIP_SECONDS = 10
def detect_event(chunk, samplerate):
freqs, times, Sxx = scipy.signal.spectrogram(chunk, samplerate, nperseg=NFFT)
idx_base = np.where((freqs >= TARGET_FREQ - TOLERANCE) & (freqs <= TARGET_FREQ + TOLERANCE))[0]
idx_oct = np.where((freqs >= OVERTONE_FREQ - OVERTONE_TOLERANCE) & (freqs <= OVERTONE_FREQ + OVERTONE_TOLERANCE))[0]
if len(idx_base) == 0 or len(idx_oct) == 0:
return False
base_energy = np.mean(Sxx[idx_base])
oct_energy = np.mean(Sxx[idx_oct])
total_energy = np.mean(Sxx, axis=0).max()
fft_vals = np.abs(fft(chunk))
freqs = fftfreq(len(chunk), 1/samplerate)
peak_freq = freqs[np.argmax(fft_vals)]
is_peak_near_target = TARGET_FREQ - TOLERANCE <= peak_freq <= TARGET_FREQ + TOLERANCE
return is_peak_near_target and base_energy > THRESHOLD_BASE * total_energy and oct_energy > THRESHOLD_OCT * total_energy
def process_chunk(filename):
input_path = os.path.join(INPUT_DIR, filename)
print(f"🔍 Verarbeite {input_path}...")
# Frequenzanalyse und Event-Erkennung
data, samplerate = sf.read(input_path)
if data.ndim > 1:
data = data[:, 0] # nur Kanal 1
chunk_samples = int(CHUNK_SECONDS * samplerate)
skip_samples = int(SKIP_SECONDS * samplerate)
padding_before = int(CLIP_PADDING_BEFORE * samplerate)
padding_after = int(CLIP_PADDING_AFTER * samplerate)
i = 0
last_event = -skip_samples
while i + chunk_samples <= len(data):
chunk = data[i:i+chunk_samples]
if i - last_event >= skip_samples and detect_event(chunk, samplerate):
clip_start = max(0, i - padding_before)
clip_end = min(len(data), i + chunk_samples + padding_after)
clip = data[clip_start:clip_end]
event_time = datetime.now().strftime("%Y%m%d-%H%M%S")
base_name = os.path.splitext(filename)[0]
wav_out = os.path.join(OUTPUT_DIR, f"{base_name}_{event_time}.wav")
png_out = os.path.join(OUTPUT_DIR, f"{base_name}_{event_time}.png")
sf.write(wav_out, clip, samplerate)
plt.figure()
plt.specgram(clip, Fs=samplerate, NFFT=NFFT, noverlap=NFFT//2, cmap='inferno', vmin=-90, vmax=-20)
plt.title(f"Spectrogram: {base_name}_{event_time}")
plt.xlabel("Time (s)")
plt.ylabel("Frequency (Hz)")
plt.colorbar(label="dB")
plt.savefig(png_out)
plt.close()
print(f"🎯 Ereignis erkannt bei {event_time}, gespeichert: {wav_out}, {png_out}")
last_event = i
i += skip_samples
else:
i += chunk_samples
# Datei verschieben
output_path = os.path.join(OUTPUT_DIR, filename)
shutil.move(input_path, output_path)
print(f"✅ Verschoben nach {output_path}")
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
files = [f for f in os.listdir(INPUT_DIR) if f.endswith(".flac")]
executor.map(process_chunk, files)
if __name__ == "__main__":
main()