test-buffer.py 730 Bytes
Newer Older
Yoann HOUPERT's avatar
Yoann HOUPERT committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst

GObject.threads_init()
Gst.init(None)

appsrc = Gst.ElementFactory.make("appsrc", "appsrc")
filesink = Gst.ElementFactory.make("filesink", "filesink")
filesink.set_property("location", "test.dat")

pipeline = Gst.Pipeline()
pipeline.add(appsrc)
pipeline.add(filesink)
appsrc.link(filesink)
pipeline.set_state(Gst.State.PLAYING)

data = "1234" * 12
print "Using data: %s" % data

buf = Gst.Buffer.new_allocate(None, len(data), None)
buf.fill(0, data)
#for (i, c) in enumerate(data):
#    buf.memset(i, c, 1)
appsrc.emit("push-buffer", buf)

pipeline.send_event(Gst.Event.new_eos())

result = open("test.dat").read()

print "Result    : %s" % result