Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion pyca/capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,14 @@ def start_capture(db, upcoming_event):
elif config('agent', 'backup_mode'):
state = Status.PAUSED_AFTER_RECORDING
else:
state = Status.FINISHED_RECORDING
#Tag as:
# must ingest automatically after recording
# or must pause ingest until ask for it
if config('ingest', 'delay_min') >= 0 and \
config('ingest', 'delay_max') >= config('ingest', 'delay_min'):
state = Status.FINISHED_RECORDING
else:
state = Status.PAUSED_AFTER_RECORDING

logger.info("Set %s to %s", event.uid, Status.str(state))
update_event_status(event, state)
Expand Down
50 changes: 35 additions & 15 deletions pyca/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from pyca.utils import http_request, service, set_service_status
from pyca.utils import set_service_status_immediate, recording_state
from pyca.utils import update_event_status, terminate
from xml.etree import ElementTree
import logging
import pycurl
import random
Expand Down Expand Up @@ -39,9 +40,14 @@ def get_config_params(properties):
return wdef, param


def ingest(event):
def ingest(event, force_uid=False):
'''Ingest a finished recording to the Opencast server.
'''

Args:
event (_type_): event data
force_uid (bool, optional): must set the current event UID in
the new mediapackage. Defaults to False.
'''
# Update status
set_service_status(Service.INGEST, ServiceStatus.BUSY)
notify.notify('STATUS=Uploading')
Expand All @@ -61,6 +67,17 @@ def ingest(event):
logger.info('Creating new mediapackage')
mediapackage = http_request(service_url + '/createMediaPackage', timeout=0)

# if force_uid, change in the mediapackage the uid by the event uid
if force_uid:
msg = f'Forcing uid to {event.uid} in the mediapackage'
logger.info(msg)
xml_string = mediapackage.decode('utf-8')
xml_root = ElementTree.fromstring(xml_string)
xml_root.set('id', event.uid)
new_mediapackage = ElementTree.tostring(
xml_root, encoding='utf-8', xml_declaration=True)
mediapackage = new_mediapackage

# extract workflow_def, workflow_config and add DC catalogs
prop = 'org.opencastproject.capture.agent.properties'
dcns = 'http://www.opencastproject.org/xsd/1.0/dublincore/'
Expand Down Expand Up @@ -139,21 +156,24 @@ def control_loop():
set_service_status_immediate(Service.INGEST, ServiceStatus.IDLE)
notify.notify('READY=1')
notify.notify('STATUS=Running')
# The true sense of the delay values define the autoingest mode status
autoingesting_mode = config('ingest', 'delay_max') >= config('ingest', 'delay_min')
while not terminate():
notify.notify('WATCHDOG=1')
# Get next recording
session = get_session()
event = session.query(RecordedEvent)\
.filter(RecordedEvent.status ==
Status.FINISHED_RECORDING).first()
if event:
# nosec: we do not need a secure random number here
delay = random.randint(config('ingest', 'delay_min'), # nosec
config('ingest', 'delay_max'))
logger.info("Delaying ingest for %s seconds", delay)
time.sleep(delay)
safe_start_ingest(event)
session.close()
if autoingesting_mode:
# Get next recording
session = get_session()
event = session.query(RecordedEvent)\
.filter(RecordedEvent.status ==
Status.FINISHED_RECORDING).first()
if event:
# nosec: we do not need a secure random number here
delay = random.randint(config('ingest', 'delay_min'), # nosec
config('ingest', 'delay_max'))
logger.info('Delaying ingest for %s seconds', delay)
time.sleep(delay)
safe_start_ingest(event)
session.close()
time.sleep(1.0)
logger.info('Shutting down ingest service')
set_service_status(Service.INGEST, ServiceStatus.STOPPED)
Expand Down
28 changes: 28 additions & 0 deletions pyca/ui/jsonapi.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
# -*- coding: utf-8 -*-
from flask import jsonify, make_response, request
from pyca.config import config
from pyca.ingest import ingest
from pyca.db import Service, ServiceStatus, UpcomingEvent, \
RecordedEvent, UpstreamState
from pyca.db import with_session, Status, ServiceStates
from pyca.ui import app
from pyca.ui.utils import requires_auth, jsonapi_mediatype
from pyca.ui.opencast_commands import schedule
from pyca.utils import get_service_status, ensurelist, timestamp
from pyca.utils import update_event_status, recording_state, set_service_status_immediate
import logging
import os
import psutil
Expand Down Expand Up @@ -181,6 +183,32 @@ def modify_event(db, uid):
return make_data_response(event.serialize())


@app.route('/api/ingest/<uid>', methods=['POST'])
@requires_auth
@jsonapi_mediatype
@with_session
def ingest_event(db, uid):
'''Ingest an event specified by its uid.

Note that this method works for recorded events only.
Returns 204 if the action was successful.
Returns 404 if event does not exist
'''
event = db.query(RecordedEvent).filter(RecordedEvent.uid == uid).first()
if not event:
return make_error_response('No event with specified uid', 404)
try:
ingest(event, force_uid=True)
return make_data_response(event.serialize(), status=204)
except Exception as e:
logger.exception(f'Something went wrong during the upload: {e}')
# Update state if something went wrong
recording_state(event.uid, 'upload_error')
update_event_status(event, Status.FAILED_UPLOADING)
set_service_status_immediate(Service.INGEST, ServiceStatus.IDLE)
return make_error_response('upload_error', 400)


@app.route('/api/metrics', methods=['GET'])
@requires_auth
@jsonapi_mediatype
Expand Down