This commit is contained in:
CroneKorkN 2025-05-30 19:12:51 +02:00
parent f99412c537
commit bf16f740cd
Signed by: cronekorkn
SSH key fingerprint: SHA256:v0410ZKfuO1QHdgKBsdQNF64xmTxOF8osF1LIqwTcVw

View file

@ -1,107 +1,92 @@
#!/usr/bin/env python3
"""
Erkennt 211 Hz + 422 Hz (Oberton) in WAV-Dateien.
Speichert WAV + PNG nur bei Erkennung.
Blockiert Folgetreffer für definierte Zeit (SKIP_SECONDS).
"""
import numpy as np
import soundfile as sf
from scipy.fft import fft, fftfreq
import matplotlib.pyplot as plt
import os
import datetime
import numpy as np
import matplotlib.pyplot as plt
import soundfile as sf
import scipy.signal
from datetime import datetime
import shutil
# === Konfiguration ===
FILENAME = "1b.flac"
TARGET_FREQ = 211
OCTAVE_FREQ = TARGET_FREQ * 2
TOLERANCE = 1
THRESHOLD_BASE = 0.3
THRESHOLD_OCT = THRESHOLD_BASE / 10
CHUNK_SECONDS = 2
CLIP_PADDING_BEFORE = 2
CLIP_PADDING_AFTER = 8
SKIP_SECONDS = 10
OUTDIR = "events"
OUTDIR = "chunks_unprocessed"
PROCESSED_DIR = "chunks_processed"
os.makedirs(OUTDIR, exist_ok=True)
def process_chunk(filename):
input_path = os.path.join(OUTDIR, filename)
print(f"🔍 Verarbeite {input_path}...")
# === WAV/Audio-Datei laden ===
data, rate = sf.read(FILENAME, dtype='float32')
if data.ndim > 1:
data = data.mean(axis=1)
# Frequenzanalyse und Event-Erkennung
data, samplerate = sf.read(input_path)
if data.ndim > 1:
data = data[:, 0] # nur Kanal 1
samples_per_chunk = int(rate * CHUNK_SECONDS)
total_chunks = len(data) // samples_per_chunk
CHUNK_SECONDS = 1
TOLERANCE = 1
THRESHOLD_BASE = 0.5
THRESHOLD_OCT = THRESHOLD_BASE / 10
CLIP_PADDING_BEFORE = 1
CLIP_PADDING_AFTER = 6
TARGET_FREQ = 211
OVERTONE_FREQ = TARGET_FREQ * 2
NFFT = 32768
SKIP_SECONDS = 10
detections = []
next_allowed_time = 0 # für Skip-Logik
chunk_samples = int(CHUNK_SECONDS * samplerate)
skip_samples = int(SKIP_SECONDS * samplerate)
padding_before = int(CLIP_PADDING_BEFORE * samplerate)
padding_after = int(CLIP_PADDING_AFTER * samplerate)
# === Analyse-Loop ===
for i in range(total_chunks):
timestamp = i * CHUNK_SECONDS
if timestamp < next_allowed_time:
continue
def detect_event(chunk):
freqs, times, Sxx = scipy.signal.spectrogram(chunk, samplerate, nperseg=NFFT)
idx_base = np.where((freqs >= TARGET_FREQ - TOLERANCE) & (freqs <= TARGET_FREQ + TOLERANCE))[0]
idx_oct = np.where((freqs >= OVERTONE_FREQ - TOLERANCE) & (freqs <= OVERTONE_FREQ + TOLERANCE))[0]
if len(idx_base) == 0 or len(idx_oct) == 0:
return False
base_energy = np.mean(Sxx[idx_base])
oct_energy = np.mean(Sxx[idx_oct])
total_energy = np.mean(Sxx, axis=0).max()
return base_energy > THRESHOLD_BASE * total_energy and oct_energy > THRESHOLD_OCT * total_energy
segment = data[i * samples_per_chunk : (i + 1) * samples_per_chunk]
if len(segment) == 0:
continue
i = 0
last_event = -skip_samples
while i + chunk_samples <= len(data):
chunk = data[i:i+chunk_samples]
if i - last_event >= skip_samples and detect_event(chunk):
clip_start = max(0, i - padding_before)
clip_end = min(len(data), i + chunk_samples + padding_after)
clip = data[clip_start:clip_end]
freqs = fftfreq(len(segment), d=1/rate)
fft_vals = np.abs(fft(segment))
event_time = datetime.now().strftime("%Y%m%d-%H%M%S")
base_name = os.path.splitext(filename)[0]
wav_out = os.path.join(PROCESSED_DIR, f"{base_name}_{event_time}.wav")
png_out = os.path.join(PROCESSED_DIR, f"{base_name}_{event_time}.png")
sf.write(wav_out, clip, samplerate)
pos_mask = freqs > 0
freqs = freqs[pos_mask]
fft_vals = fft_vals[pos_mask]
plt.figure()
plt.specgram(clip, Fs=samplerate, NFFT=NFFT, noverlap=NFFT//2, cmap='inferno', vmin=-90, vmax=-20)
plt.title(f"Spectrogram: {base_name}_{event_time}")
plt.xlabel("Time (s)")
plt.ylabel("Frequency (Hz)")
plt.colorbar(label="dB")
plt.savefig(png_out)
plt.close()
peak_freq = freqs[np.argmax(fft_vals)]
peak_mag = np.max(fft_vals)
print(f"🎯 Ereignis erkannt bei {event_time}, gespeichert: {wav_out}, {png_out}")
last_event = i
i += skip_samples
else:
i += chunk_samples
# Energien normiert
mask_base = (freqs >= TARGET_FREQ - TOLERANCE) & (freqs <= TARGET_FREQ + TOLERANCE)
energy_base = np.mean(fft_vals[mask_base]) / peak_mag
# Datei verschieben
output_path = os.path.join(PROCESSED_DIR, filename)
shutil.move(input_path, output_path)
print(f"✅ Verschoben nach {output_path}")
mask_oct = (freqs >= OCTAVE_FREQ - TOLERANCE) & (freqs <= OCTAVE_FREQ + TOLERANCE)
energy_oct = np.mean(fft_vals[mask_oct]) / peak_mag
is_peak_near_target = TARGET_FREQ - TOLERANCE <= peak_freq <= TARGET_FREQ + TOLERANCE
detected = is_peak_near_target and energy_base > THRESHOLD_BASE and energy_oct > THRESHOLD_OCT
def main():
for filename in os.listdir(OUTDIR):
if filename.endswith(".flac"):
process_chunk(filename)
if detected:
detections.append((timestamp, round(energy_base, 4), round(energy_oct, 4), round(peak_freq, 2)))
next_allowed_time = timestamp + SKIP_SECONDS
# Ausschnitt extrahieren
start = max(0, int((timestamp - CLIP_PADDING_BEFORE) * rate))
end = min(len(data), int((timestamp + CLIP_PADDING_AFTER) * rate))
clip = (data[start:end] * 32767).astype(np.int16)
base_filename = os.path.join(OUTDIR, f"event_{int(timestamp):04}s")
wav_name = f"{base_filename}.wav"
png_name = f"{base_filename}.png"
# WAV speichern
sf.write(wav_name, clip, rate, subtype="PCM_24")
print(f"🟢 WAV gespeichert: {wav_name} (211Hz: {energy_base:.4f}, 422Hz: {energy_oct:.4f}, Peak: {peak_freq:.1f} Hz)")
# PNG Spektrogramm
plt.figure(figsize=(10, 4))
# Verstärke das Signal künstlich, um schwache Ereignisse im dB-Spektrum sichtbarer zu machen
plt.specgram((clip / 32767.0), NFFT=32768, Fs=rate, noverlap=512, cmap="plasma", vmin=-80, vmax=-35)
plt.title(f"Ereignis @ {timestamp:.2f}s")
plt.xlabel("Zeit (s)")
plt.ylabel("Frequenz (Hz)")
plt.ylim(0, 1000)
plt.colorbar(label="Intensität (dB)")
plt.tight_layout()
plt.savefig(png_name)
plt.close()
print(f"📷 PNG gespeichert: {png_name}")
# === Zusammenfassung ===
print("\n🎯 Erkennungen:")
for ts, eb, eo, pf in detections:
print(f"- {ts:.2f}s | 211Hz: {eb} | 422Hz: {eo} | Peak: {pf:.1f} Hz")
if not detections:
print("→ Keine gültigen Ereignisse erkannt.")
if __name__ == "__main__":
main()