This commit is contained in:
CroneKorkN 2025-05-30 21:31:16 +02:00
parent abee103ed9
commit 788416adb6
Signed by: cronekorkn
SSH key fingerprint: SHA256:v0410ZKfuO1QHdgKBsdQNF64xmTxOF8osF1LIqwTcVw

View file

@ -7,15 +7,15 @@ import matplotlib.pyplot as plt
import soundfile as sf import soundfile as sf
import scipy.signal import scipy.signal
from scipy.fft import fft, fftfreq from scipy.fft import fft, fftfreq
from datetime import datetime
import shutil import shutil
INPUT_DIR = "chunks" CHUNK_DIR = "chunks"
OUTPUT_DIR = "chunks/processed" PROCESSED_CHUNK_DIR = "chunks/processed"
CHUNK_SECONDS = 1 EVENT_DIR = "events"
TOLERANCE = 1 SAMPLE_SECONDS = 1
TOLERANCE = 2
OVERTONE_TOLERANCE = TOLERANCE * 2 OVERTONE_TOLERANCE = TOLERANCE * 2
THRESHOLD_BASE = 0.5 THRESHOLD_BASE = 0.
THRESHOLD_OCT = THRESHOLD_BASE / 10 THRESHOLD_OCT = THRESHOLD_BASE / 10
CLIP_PADDING_BEFORE = 1 CLIP_PADDING_BEFORE = 1
CLIP_PADDING_AFTER = 6 CLIP_PADDING_AFTER = 6
@ -24,8 +24,29 @@ OVERTONE_FREQ = TARGET_FREQ * 2
NFFT = 32768 NFFT = 32768
SKIP_SECONDS = 10 SKIP_SECONDS = 10
def detect_event(chunk, samplerate): def process_chunk(filename):
freqs, times, Sxx = scipy.signal.spectrogram(chunk, samplerate, nperseg=NFFT) input_path = os.path.join(CHUNK_DIR, filename)
print(f"🔍 Verarbeite {input_path}...")
# Frequenzanalyse und Event-Erkennung
data, samplerate = sf.read(input_path)
if data.ndim > 1:
data = data[:, 0] # nur Kanal 1
chunk_samples = int(SAMPLE_SECONDS * samplerate)
skip_samples = int(SKIP_SECONDS * samplerate)
padding_before = int(CLIP_PADDING_BEFORE * samplerate)
padding_after = int(CLIP_PADDING_AFTER * samplerate)
chunk_start_str = os.path.splitext(filename)[0]
chunk_start_dt = datetime.datetime.strptime(chunk_start_str, "%Y%m%d-%H%M%S")
i = 0
last_event = -skip_samples
while i + chunk_samples <= len(data):
clip = data[i:i+chunk_samples]
freqs, times, Sxx = scipy.signal.spectrogram(clip, samplerate, nperseg=NFFT)
idx_base = np.where((freqs >= TARGET_FREQ - TOLERANCE) & (freqs <= TARGET_FREQ + TOLERANCE))[0] idx_base = np.where((freqs >= TARGET_FREQ - TOLERANCE) & (freqs <= TARGET_FREQ + TOLERANCE))[0]
idx_oct = np.where((freqs >= OVERTONE_FREQ - OVERTONE_TOLERANCE) & (freqs <= OVERTONE_FREQ + OVERTONE_TOLERANCE))[0] idx_oct = np.where((freqs >= OVERTONE_FREQ - OVERTONE_TOLERANCE) & (freqs <= OVERTONE_FREQ + OVERTONE_TOLERANCE))[0]
if len(idx_base) == 0 or len(idx_oct) == 0: if len(idx_base) == 0 or len(idx_oct) == 0:
@ -34,45 +55,25 @@ def detect_event(chunk, samplerate):
oct_energy = np.mean(Sxx[idx_oct]) oct_energy = np.mean(Sxx[idx_oct])
total_energy = np.mean(Sxx, axis=0).max() total_energy = np.mean(Sxx, axis=0).max()
fft_vals = np.abs(fft(chunk)) fft_vals = np.abs(fft(clip))
freqs = fftfreq(len(chunk), 1/samplerate) freqs = fftfreq(len(clip), 1/samplerate)
peak_freq = freqs[np.argmax(fft_vals)] peak_freq = freqs[np.argmax(fft_vals)]
is_peak_near_target = TARGET_FREQ - TOLERANCE <= peak_freq <= TARGET_FREQ + TOLERANCE is_peak_near_target = TARGET_FREQ - TOLERANCE <= peak_freq <= TARGET_FREQ + TOLERANCE
return is_peak_near_target and base_energy > THRESHOLD_BASE * total_energy and oct_energy > THRESHOLD_OCT * total_energy event_detected = is_peak_near_target and base_energy > THRESHOLD_BASE * total_energy and oct_energy > THRESHOLD_OCT * total_energy
def process_chunk(filename): if i - last_event >= skip_samples and event_detected:
input_path = os.path.join(INPUT_DIR, filename)
print(f"🔍 Verarbeite {input_path}...")
# Frequenzanalyse und Event-Erkennung
data, samplerate = sf.read(input_path)
if data.ndim > 1:
data = data[:, 0] # nur Kanal 1
chunk_samples = int(CHUNK_SECONDS * samplerate)
skip_samples = int(SKIP_SECONDS * samplerate)
padding_before = int(CLIP_PADDING_BEFORE * samplerate)
padding_after = int(CLIP_PADDING_AFTER * samplerate)
i = 0
last_event = -skip_samples
while i + chunk_samples <= len(data):
chunk = data[i:i+chunk_samples]
if i - last_event >= skip_samples and detect_event(chunk, samplerate):
clip_start = max(0, i - padding_before) clip_start = max(0, i - padding_before)
clip_end = min(len(data), i + chunk_samples + padding_after) clip_end = min(len(data), i + chunk_samples + padding_after)
clip = data[clip_start:clip_end] clip = data[clip_start:clip_end]
chunk_start_str = os.path.splitext(filename)[0]
chunk_start_dt = datetime.strptime(chunk_start_str, "%Y%m%d-%H%M%S")
event_offset = (i - padding_before) / samplerate event_offset = (i - padding_before) / samplerate
event_time_dt = chunk_start_dt + datetime.timedelta(seconds=event_offset) event_time_dt = chunk_start_dt + datetime.timedelta(seconds=event_offset)
event_time = event_time_dt.strftime("%Y%m%d-%H%M%S") event_time = event_time_dt.strftime("%Y%m%d-%H%M%S")
base_name = os.path.splitext(filename)[0] base_name = os.path.splitext(filename)[0]
flac_out = os.path.join(OUTPUT_DIR, f"{base_name}_{event_time}.flac") flac_out = os.path.join(EVENT_DIR, f"{base_name}_{event_time}.flac")
png_out = os.path.join(OUTPUT_DIR, f"{base_name}_{event_time}.png") png_out = os.path.join(EVENT_DIR, f"{base_name}_{event_time}.png")
sf.write(flac_out, clip, samplerate, format='FLAC') sf.write(flac_out, clip, samplerate, format='FLAC')
plt.figure() plt.figure()
@ -84,24 +85,28 @@ def process_chunk(filename):
plt.savefig(png_out) plt.savefig(png_out)
plt.close() plt.close()
print(f"🎯 Ereignis erkannt bei {event_time}, gespeichert: {flac_out}, {png_out}") print(f"Event: {event_time} peak_freq: {int(peak_freq)} base_energy: {int(base_energy)} oct_energy: {int(oct_energy)} total_energy: {int(total_energy)}")
last_event = i last_event = i
i += skip_samples i += skip_samples
else: else:
i += chunk_samples i += chunk_samples
# Datei verschieben # Datei verschieben
output_path = os.path.join(OUTPUT_DIR, filename) output_path = os.path.join(PROCESSED_CHUNK_DIR, filename)
#shutil.move(input_path, output_path) #shutil.move(input_path, output_path)
print(f"✅ Verschoben nach {output_path}") print(f"✅ Verschoben nach {output_path}")
def main(): def main():
os.makedirs(OUTPUT_DIR, exist_ok=True) os.makedirs(EVENT_DIR, exist_ok=True)
os.makedirs(PROCESSED_CHUNK_DIR, exist_ok=True)
with concurrent.futures.ProcessPoolExecutor() as executor: for file in os.listdir(CHUNK_DIR):
files = [f for f in os.listdir(INPUT_DIR) if f.endswith(".flac")] if file.endswith(".flac"):
executor.map(process_chunk, files) process_chunk(file)
# with concurrent.futures.ProcessPoolExecutor() as executor:
# files = [f for f in os.listdir(CHUNK_DIR) if f.endswith(".flac")]
# executor.map(process_chunk, files)
if __name__ == "__main__": if __name__ == "__main__":
main() main()