|
from multiprocessing import Process, connection |
|
from typing import Any, Dict, Optional, List, Deque |
|
from collections import deque |
|
import pyaudio |
|
import io |
|
import wave |
|
|
|
|
|
class AudioRecorder: |
|
def __init__( |
|
self, |
|
output_pipe: connection.Connection, |
|
input_device_index: Optional[int] = None, |
|
): |
|
self.CHUNK: int = 1024 |
|
self.FORMAT: int = pyaudio.paInt16 |
|
self.CHANNELS: int = 1 |
|
self.RATE: int = 44100 |
|
self.RECORD_SECONDS: int = 1 |
|
self.recording_process: Optional[Process] = None |
|
self.audio_chunks: Deque[bytes] = deque(maxlen=2) |
|
self.output_pipe: connection.Connection = output_pipe |
|
self.input_device_index: Optional[int] = input_device_index |
|
|
|
@staticmethod |
|
def list_microphones() -> List[Dict[str, Any]]: |
|
"""List all available input devices with their properties""" |
|
p = pyaudio.PyAudio() |
|
devices = [] |
|
|
|
for i in range(p.get_device_count()): |
|
device_info = p.get_device_info_by_index(i) |
|
if device_info["maxInputChannels"] > 0: |
|
devices.append(device_info) |
|
|
|
p.terminate() |
|
return devices |
|
|
|
def create_wav_bytes(self, frames: List[bytes]) -> bytes: |
|
"""Convert raw audio frames to WAV format in memory""" |
|
wav_buffer = io.BytesIO() |
|
with wave.open(wav_buffer, "wb") as wf: |
|
wf.setnchannels(self.CHANNELS) |
|
wf.setsampwidth(pyaudio.get_sample_size(self.FORMAT)) |
|
wf.setframerate(self.RATE) |
|
wf.writeframes(b"".join(frames)) |
|
return wav_buffer.getvalue() |
|
|
|
def record_audio(self) -> None: |
|
p = pyaudio.PyAudio() |
|
|
|
while True: |
|
stream = p.open( |
|
format=self.FORMAT, |
|
channels=self.CHANNELS, |
|
rate=self.RATE, |
|
input=True, |
|
input_device_index=self.input_device_index, |
|
frames_per_buffer=self.CHUNK, |
|
) |
|
|
|
frames: List[bytes] = [] |
|
|
|
|
|
for _ in range(0, int(self.RATE / self.CHUNK * self.RECORD_SECONDS)): |
|
try: |
|
data = stream.read(self.CHUNK, exception_on_overflow=False) |
|
frames.append(data) |
|
except OSError as e: |
|
print(f"Warning: Audio input overflow occurred: {e}") |
|
continue |
|
|
|
stream.stop_stream() |
|
stream.close() |
|
|
|
|
|
wav_bytes = self.create_wav_bytes(frames) |
|
self.audio_chunks.append(wav_bytes) |
|
|
|
|
|
if len(self.audio_chunks) == 2: |
|
self.output_pipe.send(b"".join(self.audio_chunks)) |
|
|
|
def start_recording(self) -> None: |
|
"""Démarre l'enregistrement dans un processus séparé""" |
|
self.recording_process = Process(target=self.record_audio) |
|
self.recording_process.start() |
|
|
|
def stop_recording(self) -> None: |
|
"""Arrête l'enregistrement""" |
|
if self.recording_process: |
|
self.recording_process.terminate() |
|
self.recording_process = None |
|
|