2 回答

TA貢獻1829條經驗 獲得超7個贊
正如這些看似簡單的問題似乎總是如此,魔鬼在細節中。我最終寫了一些代碼來解決這個問題。pip 依賴項可以使用python3 -m pip install ffmpeg-python PyOpenAL
. 代碼的工作流程可以分為兩步:
代碼必須從在線流中下載 mp3 文件數據的二進制塊,并將它們轉換為原始 PCM 數據(基本上是簽名的 uint16_t 幅度值)以進行播放。這是使用ffmpeg-python 庫完成的,它是FFmpeg的包裝器。此包裝器在單獨的進程中運行 FFmpeg,因此此處不會發生阻塞。
然后代碼必須將這些塊排隊以進行播放。這是使用PyOpenAL完成的,它是OpenAL的包裝器。在創建設備和上下文以啟用音頻播放后,將創建一個 3d 定位的源。這個源不斷地與緩沖區(模擬“環形緩沖區”)一起排隊,這些緩沖區填充了從 FFmpeg 輸入的數據。這與第一步在單獨的線程上運行,使下載新音頻塊獨立于音頻塊播放運行。
這是該代碼的樣子(帶有一些注釋)。如果您對代碼或此答案的任何其他部分有任何疑問,請告訴我。
import ctypes
import ffmpeg
import numpy as np
from openal.al import *
from openal.alc import *
from queue import Queue, Empty
from threading import Thread
import time
from urllib.request import urlopen
def init_audio():
#Create an OpenAL device and context.
device_name = alcGetString(None, ALC_DEFAULT_DEVICE_SPECIFIER)
device = alcOpenDevice(device_name)
context = alcCreateContext(device, None)
alcMakeContextCurrent(context)
return (device, context)
def create_audio_source():
#Create an OpenAL source.
source = ctypes.c_uint()
alGenSources(1, ctypes.pointer(source))
return source
def create_audio_buffers(num_buffers):
#Create a ctypes array of OpenAL buffers.
buffers = (ctypes.c_uint * num_buffers)()
buffers_ptr = ctypes.cast(
ctypes.pointer(buffers),
ctypes.POINTER(ctypes.c_uint),
)
alGenBuffers(num_buffers, buffers_ptr)
return buffers_ptr
def fill_audio_buffer(buffer_id, chunk):
#Fill an OpenAL buffer with a chunk of PCM data.
alBufferData(buffer_id, AL_FORMAT_STEREO16, chunk, len(chunk), 44100)
def get_audio_chunk(process, chunk_size):
#Fetch a chunk of PCM data from the FFMPEG process.
return process.stdout.read(chunk_size)
def play_audio(process):
#Queues up PCM chunks for playing through OpenAL
num_buffers = 4
chunk_size = 8192
device, context = init_audio()
source = create_audio_source()
buffers = create_audio_buffers(num_buffers)
#Initialize the OpenAL buffers with some chunks
for i in range(num_buffers):
buffer_id = ctypes.c_uint(buffers[i])
chunk = get_audio_chunk(process, chunk_size)
fill_audio_buffer(buffer_id, chunk)
#Queue the OpenAL buffers into the OpenAL source and start playing sound!
alSourceQueueBuffers(source, num_buffers, buffers)
alSourcePlay(source)
num_used_buffers = ctypes.pointer(ctypes.c_int())
while True:
#Check if any buffers are used up/processed and refill them with data.
alGetSourcei(source, AL_BUFFERS_PROCESSED, num_used_buffers)
if num_used_buffers.contents.value != 0:
used_buffer_id = ctypes.c_uint()
used_buffer_ptr = ctypes.pointer(used_buffer_id)
alSourceUnqueueBuffers(source, 1, used_buffer_ptr)
chunk = get_audio_chunk(process, chunk_size)
fill_audio_buffer(used_buffer_id, chunk)
alSourceQueueBuffers(source, 1, used_buffer_ptr)
if __name__ == "__main__":
url = "http://icecast.spc.org:8000/longplayer"
#Run FFMPEG in a separate process using subprocess, so it is non-blocking
process = (
ffmpeg
.input(url)
.output("pipe:", format='s16le', acodec='pcm_s16le', ac=2, ar=44100, loglevel="quiet")
.run_async(pipe_stdout=True)
)
#Run audio playing OpenAL code in a separate thread
thread = Thread(target=play_audio, args=(process,), daemon=True)
thread.start()
#Some example code to show that this is not being blocked by the audio.
start = time.time()
while True:
print(time.time() - start)

TA貢獻1813條經驗 獲得超2個贊
使用pyminiaudio : (它提供了一個 icecast 流源類):
import miniaudio
def title_printer(client: miniaudio.IceCastClient, new_title: str) -> None:
print("Stream title: ", new_title)
with miniaudio.IceCastClient("http://icecast.spc.org:8000/longplayer",
update_stream_title=title_printer) as source:
print("Connected to internet stream, audio format:", source.audio_format.name)
print("Station name: ", source.station_name)
print("Station genre: ", source.station_genre)
print("Press <enter> to quit playing.\n")
stream = miniaudio.stream_any(source, source.audio_format)
with miniaudio.PlaybackDevice() as device:
device.start(stream)
input() # wait for user input, stream plays in background
添加回答
舉報