As I restarted my job search (yes, I am still #OpenToWork, ping me!), in one of the job applications, I was asked to implement a prototype that processes video data. While working through the project, I unexpectedly got a lot of help from generative AI chatbots due to my relatively inexperience in the area.
As mentioned in the title, ffmpeg was used to perform some preprocessing work. One of the goals of the project was to be able to play multiple video files one after another. While there are multiple ways to achieve that, I decided to go with the most obvious solution, concatenating them together.
$ cat video1 video2 video3 | python further-work.py
In order to achieve that, I first had to re-encode the files into formats that would allow it. After “discussing” with Google Gemini on this, the chatbot recommended I go with MPEG-TS for the purpose.
There are other file formats that could be used for the purpose, but they are irrelevant to the discussion. After I get the video re-encoded in to this format, the video data would be sent to a queue, to be consumed by other modules, running in other processes.
After defining both input (a list of video files to be fetched online) and output (re-encoded video file content), it was time to figure out how to do it. Unfortunately, ffmpeg is such a complicated utility that does so many things. There are/were multiple attempts to provide some interface to help users with it (I really wanted to try this, but it is dead now, apparently). However, with how helpful generative AI is these days, getting the right command is just a few prompts away.
ffmpeg -hwaccel cuda -i pipe:0 -c:v h264_nvenc -b:v 1.5M -c:a aac -b:a 128k -f mpegts -y pipe:1
It even gave an explanation on what each of those argument means, as shown in the screenshot below.
In short, the command accepts video file content through stdin, and outputs the re-encoded video file content as stdout.
Now time to code the implementation, as I wanted to both read from and write to ffmpeg concurrently, so this is going to be an asyncio application. The http client library we are using this time is httpx, which has a method to fetch download in smaller batches:
import httpx
client = httpx.AsyncClient()
async def write_input(
client: httpx.AsyncClient, video_link: str, process: asyncio.subprocess.Process
) -> None:
async with client.stream("GET", video_link) as response:
async for chunk in response.aiter_raw(1024):
print(chunk) # this is the downloaded video file, in chunks
We worry about the actual processing later, for now we would just get the code to print the chunks to the screen.
Next we write a function to call ffmpeg, through asyncio.create_subprocess_exec
async def video_send(client: httpx.AsyncClient, video_link: str) -> None:
logger.info("DATA: Fetching video from link", link=video_link)
process = await asyncio.create_subprocess_exec(
"ffmpeg",
"-hwaccel",
"cuda",
"-i",
"pipe:0",
"-c:v",
"h264_nvenc",
# "libx264",
"-b:v",
"1.5M",
"-c:a",
"aac",
"-b:a",
"128k",
"-f",
"mpegts",
"-y",
"pipe:1",
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
)
asyncio.create_task(write_input(client, video_link, process))
Ideally, we would use `process.communicate(file_content)` here as advised in the documentation, unfortunately if we did that, we would have to first download the whole file, which would inevitably delay the response, which was not ideal.
Instead, we could use process.stdin.write()
, let’s update the original write_input
function:
async def write_input(
client: httpx.AsyncClient, video_link: str, process: asyncio.subprocess.Process
) -> None:
assert isinstance(process.stdin, asyncio.StreamWriter)
async with client.stream("GET", video_link) as response:
logger.info("DATA: Streaming video to queue", link=video_link)
async for chunk in response.aiter_raw(1024):
process.stdin.write(chunk)
await process.stdin.drain()
if process.stdin.can_write_eof():
process.stdin.write_eof()
process.stdin.close()
await process.stdin.wait_closed()
logger.info("DATA: Done downloading video to ffmpeg")
With every downloaded chunk
,
- we feed it to the process through
process.stdin.write(chunk)
. - Once done, we would write an EOF (
process.stdin.write_eof()
) to denote the end of file input, - followed by a
.close()
(and correspondingawait .wait_closed()
)
Back to video_send
function, we continue the function by reading through process.stdout
. Being able to do both reading and writing is exactly why we are doing this through asyncio. Previously in synchronous setting, we could only do one after another in a fixed order, but now we could let the scheduler worry about the order. Now the function has the following code added for reading the re-encoded file content, and post it to the queue:
async def video_send(queue: Queue, client: httpx.AsyncClient, video_link: str) -> None:
...
assert isinstance(process.stdout, asyncio.StreamReader)
while True:
chunk = await process.stdout.read(1024)
if not chunk:
break
else:
await asyncio.to_thread(partial(queue.put, chunk))
await process.wait()
logger.info("DATA: Done sending video to queue")
In a loop, we
- Fetch a
chunk
of data fromffmpeg
stdout - If
chunk
is an empty string, break from the loop - Otherwise, push the
chunk
to the queue (throughasyncio.to_thread
, as we are using process-safe version here) - Then we wait for the command to exit gracefully, through
process.wait()
It seems very straightforward now, but it took me the whole night to actually get this done correctly (and I was still revising the code while writing this). Half the time I was checking through the documentation to ensure I wasn’t missing anything, other time I would be getting Gemini to review my code.
Hopefully you find this useful, and that’s it for today, hopefully we will get back to the previously promised Advent of Code content next week.
Leave a Reply