1414
1515package com .adobe .api .platform .runtime .mesos
1616
17+ import akka .Done
1718import akka .actor .Actor
1819import akka .actor .ActorLogging
1920import akka .actor .ActorRef
2021import akka .actor .ActorSystem
2122import akka .actor .Cancellable
23+ import akka .actor .CoordinatedShutdown
2224import akka .actor .Props
25+ import akka .dispatch .Futures
2326import akka .event .LoggingAdapter
2427import akka .http .scaladsl .model .HttpResponse
2528import akka .pattern .ask
@@ -140,7 +143,10 @@ case class Failed(taskId: String, agentId: String) extends TaskState
140143
141144case class MesosActorConfig (agentStatsTTL : FiniteDuration ,
142145 agentStatsPruningPeriod : FiniteDuration ,
143- failPendingOfferCycles : Option [Int ])
146+ failPendingOfferCycles : Option [Int ],
147+ holdOffers : Boolean ,
148+ holdOffersTTL : FiniteDuration ,
149+ holdOffersPruningPeriod : FiniteDuration )
144150case class AgentStats (mem : Double , cpu : Double , ports : Int , expiration : Instant )
145151
146152case class MesosAgentStats (stats : Map [String , AgentStats ])
@@ -151,6 +157,8 @@ case class CapacityFailure(requiredMem: Float,
151157 remainingResources : List [(Float , Float , Int )])
152158 extends MesosException (" cluster does not have capacity" )
153159
160+ case class HeldOffer (offer : Offer , expiration : Instant )
161+
154162// TODO: mesos authentication
155163trait MesosClientActor extends Actor with ActorLogging with MesosClientConnection {
156164 implicit val ec : ExecutionContext = context.dispatcher
@@ -173,6 +181,9 @@ trait MesosClientActor extends Actor with ActorLogging with MesosClientConnectio
173181 val heartbeatMaxFailures : Int
174182 var heartbeatMonitor : Option [Cancellable ] = None
175183 var heartbeatFailures : Int = 0
184+ var heldOffers : Map [OfferID , HeldOffer ] = Map .empty
185+ var pendingHeldOfferMatch : Boolean = false
186+ var stopping : Boolean = false
176187
177188 var agentOfferHistory = Map .empty[String , AgentStats ] // Map[<agent hostname> -> <stats>] track the most recent offer stats per agent hostname
178189 val listener : Option [ActorRef ]
@@ -200,14 +211,21 @@ trait MesosClientActor extends Actor with ActorLogging with MesosClientConnectio
200211 })
201212 }
202213 case object PruneStats
214+ case object PruneHeldOffers
215+ case object ReleaseHeldOffers
216+ case object MatchHeldOffers // used when submitting tasks, and periodically to refresh nodestats based on held offers
203217
204218 override def preStart () = {
205- actorSystem.scheduler.schedule(30 .seconds, config.agentStatsPruningPeriod, context.actorOf(Props (new Actor {
206- override def receive : Receive = {
207- case PruneStats =>
208- context.parent ! PruneStats // client actor needs to handle PruneStats to avoid concurrent update to stats map
219+ actorSystem.scheduler.schedule(30 .seconds, config.agentStatsPruningPeriod, self, PruneStats )
220+ if (config.holdOffers) {
221+ actorSystem.scheduler.schedule(30 .seconds, config.holdOffersPruningPeriod, self, PruneHeldOffers )
222+ // setup shutdown hook for releasing held offers
223+ CoordinatedShutdown (actorSystem).addTask(CoordinatedShutdown .PhaseBeforeServiceUnbind , " releaseOffers" ) { () =>
224+ stopping = true
225+ implicit val timeout = Timeout (5 .seconds)
226+ (self ? ReleaseHeldOffers ).map(_ => Done )
209227 }
210- })), PruneStats )
228+ }
211229 }
212230 // cache the framework id, so that in case this actor restarts we can reconnect
213231 if (MesosClient .frameworkID.isEmpty) MesosClient .frameworkID = Some (FrameworkID .newBuilder().setValue(id()).build())
@@ -236,6 +254,14 @@ trait MesosClientActor extends Actor with ActorLogging with MesosClientConnectio
236254 case SubmitTask (task) => {
237255 val taskPromise = Promise [Running ]()
238256 tasks.update(task.taskId, SubmitPending (task, taskPromise))
257+ // if we are allowed to hold offers, signal use of those immediately, but only if not already signaled
258+ val now = Instant .now()
259+ if (config.holdOffers && heldOffers.nonEmpty && taskFitsHeldOffers(task) && ! pendingHeldOfferMatch) {
260+ pendingHeldOfferMatch = true
261+ // don't do offer matching here, or send the held offers (which may go away), rather trigger a separate offer cycle
262+ // that only uses the held offers
263+ self ! MatchHeldOffers
264+ }
239265 taskPromise.future.pipeTo(sender())
240266 }
241267 case DeleteTask (taskId) => {
@@ -289,6 +315,41 @@ trait MesosClientActor extends Actor with ActorLogging with MesosClientConnectio
289315 // publish MesosAgentStats to subscribers
290316 listener.foreach(_ ! MesosAgentStats (agentOfferHistory))
291317 }
318+ case PruneHeldOffers =>
319+ // prune one offers that are > TTL old (will remove and trigger a new offer, so don't prune all at once)
320+ val now = Instant .now()
321+ val expired = heldOffers.filter(_._2.expiration.isBefore(now))
322+ if (expired.nonEmpty) {
323+ // prune only the oldest one per pruning (so that we retain some held offers at all times, if possible)
324+ val expiredOffer = expired.minBy(_._2.expiration)._1
325+ declineOffers(Seq (expiredOffer))
326+ heldOffers = heldOffers - expiredOffer
327+ logger.info(s " pruned 1 held offers " )
328+ }
329+ case ReleaseHeldOffers =>
330+ if (heldOffers.nonEmpty) {
331+ logger.info(s " declining (on shutdown) ${heldOffers.size} held offers " )
332+ val declineCall = Call .newBuilder
333+ .setFrameworkId(frameworkID)
334+ .setType(Call .Type .DECLINE )
335+ .setDecline(
336+ Call .Decline .newBuilder
337+ .addAllOfferIds(heldOffers.keys.asJava)
338+ .build)
339+ .build()
340+ execInternal(declineCall).pipeTo(sender())
341+ heldOffers = Map .empty
342+ } else {
343+ logger.info(" no offers held (on shutdown)" )
344+ sender() ! Future .successful({})
345+ }
346+ case MatchHeldOffers =>
347+ pendingHeldOfferMatch = false // reset flag to allow signaling again
348+ if (heldOffers.nonEmpty) {
349+ log.info(s " attempting to use ${heldOffers.size} held offers " )
350+ val heldOffersMessage = Event .Offers .newBuilder().addAllOffers(heldOffers.values.map(_.offer).asJava).build()
351+ handleOffers(heldOffersMessage)
352+ }
292353 case msg => log.warning(s " unknown msg: ${msg}" )
293354 }
294355
@@ -315,8 +376,10 @@ trait MesosClientActor extends Actor with ActorLogging with MesosClientConnectio
315376 case MesosTaskState .TASK_STAGING | MesosTaskState .TASK_STARTING =>
316377 log.info(
317378 s " task still launching task ${event.getStatus.getTaskId.getValue} (in state ${event.getStatus.getState}" )
318- case _ =>
319- log.warning(s " failing task ${event.getStatus.getTaskId.getValue} msg: ${event.getStatus.getMessage}" )
379+ case t =>
380+ log.warning(
381+ s " failing task ${event.getStatus.getTaskId.getValue} exception: ${t} msg: ${event.getStatus.getMessage}" )
382+ // if (event.getStatus.getMessage == "Container exited with status 125"
320383 promise.failure(
321384 new MesosException (s " task in state ${event.getStatus.getState} msg: ${event.getStatus.getMessage}" ))
322385 }
@@ -386,7 +449,9 @@ trait MesosClientActor extends Actor with ActorLogging with MesosClientConnectio
386449 }
387450 acknowledge(event.getStatus)
388451 }
389-
452+ def taskFitsHeldOffers (task : TaskDef ): Boolean = {
453+ taskMatcher.matchTasksToOffers(role, Seq (task), heldOffers.map(_._2.offer), taskBuilder)._1.nonEmpty
454+ }
390455 def acknowledge (status : TaskStatus ): Unit = {
391456 if (status.hasUuid) {
392457 val ack = Call
@@ -405,7 +470,20 @@ trait MesosClientActor extends Actor with ActorLogging with MesosClientConnectio
405470
406471 }
407472 }
408-
473+ def declineOffers (offerIds : Iterable [OfferID ]) = {
474+ if (offerIds.nonEmpty) {
475+ val declineCall = Call .newBuilder
476+ .setFrameworkId(frameworkID)
477+ .setType(Call .Type .DECLINE )
478+ .setDecline(Call .Decline .newBuilder
479+ .addAllOfferIds(offerIds.asJava))
480+ .build;
481+ execInternal(declineCall).onComplete {
482+ case Success (_) => log.debug(s " successfully declined ${offerIds.size} offers " )
483+ case Failure (t) => log.error(s " decline failed ${t}" )
484+ }
485+ }
486+ }
409487 override def postStop (): Unit = {
410488 logger.info(" postStop cancelling heartbeatMonitor" )
411489 heartbeatMonitor.foreach(_.cancel())
@@ -428,20 +506,19 @@ trait MesosClientActor extends Actor with ActorLogging with MesosClientConnectio
428506 log.debug(s " received ${event.getOffersList.size} offers: ${toCompactJsonString(event);}" )
429507
430508 val agentOfferMap = event.getOffersList.asScala
431- .map(o => o.getHostname -> o.getResourcesList.asScala.filter(_.getRole == role).groupBy(_.getName)) // map hostname to resource map including only resources allocated to this role
509+ .map(o => o -> o.getResourcesList.asScala.filter(_.getRole == role).groupBy(_.getName)) // map hostname to resource map including only resources allocated to this role
432510 .filter(a => Set (" cpus" , " mem" , " ports" ).subsetOf(a._2.keySet)) // remove hosts that do not have resources allocated for all of cpus+mem+ports
433511 .toMap
434512 if (agentOfferMap.nonEmpty) {
435513 log.info(
436514 s " received ${event.getOffersList.size} offers total and ${agentOfferMap.size} usable offers for role $role" )
437- agentOfferMap.foreach(
438- o =>
439- log.info(
440- s " usable offer ${o._1}: max mem: ${o._2(" mem" ).maxBy(_.getScalar.getValue).getScalar.getValue} max cpus: ${o
441- ._2(" cpus" )
442- .maxBy(_.getScalar.getValue)
443- .getScalar
444- .getValue} max ports: ${MesosClient .countPorts(o._2(" ports" ))}" ))
515+ agentOfferMap.foreach(o =>
516+ log.info(
517+ s " usable offer ${o._1.getHostname}: max mem: ${o._2(" mem" ).maxBy(_.getScalar.getValue).getScalar.getValue} max cpus: ${o
518+ ._2(" cpus" )
519+ .maxBy(_.getScalar.getValue)
520+ .getScalar
521+ .getValue} max ports: ${MesosClient .countPorts(o._2(" ports" ))}" ))
445522
446523 log.debug(s " agent offer stats: {} " , agentOfferHistory)
447524
@@ -465,61 +542,93 @@ trait MesosClientActor extends Actor with ActorLogging with MesosClientConnectio
465542 // if some tasks matched, we explicitly accept the matched offers, and others are explicitly declined
466543
467544 // Decline the offers not selected. Sometimes these just stay dangling in mesos outstanding offers
468- val declineOfferIds =
545+ val unusedOfferIds =
469546 asScalaBuffer(event.getOffersList).map(offer => offer.getId).filter(! matchedTasks.contains(_))
470- if (declineOfferIds.nonEmpty) {
471- logger.info(s " declining ${declineOfferIds.size} offers " )
472- val declineCall = Call .newBuilder
473- .setFrameworkId(frameworkID)
474- .setType(Call .Type .DECLINE )
475- .setDecline(Call .Decline .newBuilder
476- .addAllOfferIds(seqAsJavaList(declineOfferIds)))
477- .build;
478- execInternal(declineCall)
547+ if (config.holdOffers && ! stopping) { // handle held offers
548+ // remove matched offers
549+ val matchedOffers = matchedTasks.keySet
550+ heldOffers = heldOffers -- matchedOffers // remove matched offers from heldOffers
551+
552+ // add remaining unused offers
553+ if (unusedOfferIds.nonEmpty) {
554+ val now = Instant .now()
555+ // find the usable ones
556+ val usableUnusedOffers =
557+ agentOfferMap.filter(a => unusedOfferIds.contains(a._1.getId) && ! matchedOffers.contains(a._1.getId))
558+ if (usableUnusedOffers.size > 0 ) {
559+ logger.info(s " holding ${usableUnusedOffers.size} unused offers " )
560+ // save the usable ones
561+ usableUnusedOffers.foreach(
562+ o =>
563+ heldOffers = heldOffers + (o._1.getId -> HeldOffer (
564+ o._1,
565+ now.plusSeconds(config.holdOffersTTL.toSeconds))))
566+ }
567+
568+ // remove the usable from unused
569+ val unusableUnusedOfferIds = unusedOfferIds -- heldOffers.keys
570+ // decline unused - usableUnused
571+ if (unusableUnusedOfferIds.size > 0 ) {
572+ logger.info(s " declining ${unusableUnusedOfferIds.size} unused or unusable offers " )
573+ declineOffers(unusableUnusedOfferIds)
574+ }
575+ }
576+ } else {
577+ logger.info(s " declining ${unusedOfferIds.size} unused offers " )
578+ declineOffers(unusedOfferIds)
479579 }
480580
481581 matchedTasks.foreach { offerTasks =>
482582 val taskInfos : java.lang.Iterable [TaskInfo ] = offerTasks._2.map(_._1).asJava
483583 val acceptCall = MesosClient .accept(frameworkID, Seq (offerTasks._1).asJava, taskInfos)
484584 execInternal(acceptCall).onComplete {
485585 case Success (r) =>
486- matchedTasks.foreach(entry => {
487- entry._2.foreach(task => {
488- tasks(task._1.getTaskId.getValue) match {
489- case s @ SubmitPending (reqs, promise, _) =>
490- // dig the hostname out of the offer whose agent id matches the agent id in the task info
491- val hostname =
492- event.getOffersList.asScala.find(p => p.getAgentId == task._1.getAgentId).get.getHostname
493- tasks.update(reqs.taskId, Submitted (s, task._1, entry._1, hostname, task._2, promise))
494- case previousState =>
495- log.warning(s " submitted a task that was not in SubmitPending? ${previousState}" )
496- }
497- })
498-
586+ // ACCEPT was successful, we will be updated when it changes to RUNNING
587+ case Failure (t) =>
588+ log.error(s " accept failure ${t}" )
589+ offerTasks._2.foreach(task => {
590+ tasks(task._1.getTaskId.getValue) match {
591+ case s @ Submitted (pending, taskInfo, offer, hostname, hostports, promise) =>
592+ promise.failure(t)
593+ case previousState =>
594+ log.warning(s " accepted a task that was not in Submitted? ${previousState}" )
595+ }
499596 })
500- if (! pending.isEmpty) {
501- log.warning(" still have pending tasks after OFFER + ACCEPT: " )
502- pending.foreach(t => log.info(s " pending taskid ${t.taskId}" ))
503- }
504- case Failure (t) => log.error(s " failure ${t}" )
505597 }
598+ // immediately update tasks to Submitted status
599+ offerTasks._2.foreach(task => {
600+ tasks(task._1.getTaskId.getValue) match {
601+ case s @ SubmitPending (reqs, promise, _) =>
602+ // dig the hostname out of the offer whose agent id matches the agent id in the task info
603+ val hostname =
604+ event.getOffersList.asScala.find(p => p.getAgentId == task._1.getAgentId).get.getHostname
605+ tasks.update(reqs.taskId, Submitted (s, task._1, offerTasks._1, hostname, task._2, promise))
606+ case previousState =>
607+ log.warning(s " submitted a task that was not in SubmitPending? ${previousState}" )
608+ }
609+ })
610+ }
506611
612+ val matchedTaskIds = matchedTasks.values.flatMap(_.map(_._1.getTaskId.getValue)).toSet
613+ val unmatchedPending = pending.filter(p => ! matchedTaskIds.contains(p.taskId))
614+ if (unmatchedPending.nonEmpty) {
615+ log.warning(" still have pending tasks after OFFER + ACCEPT: " )
616+ unmatchedPending.foreach(t => log.info(s " pending taskid ${t.taskId} ${t.mem}MB ${t.cpus}CPUS " ))
507617 }
508618 // generate failures for pending tasks that did not fit any offers
509619 config.failPendingOfferCycles.foreach { maxOfferCycles =>
510620 val submitPending = tasks.collect { case (_, s : SubmitPending ) => s }
511621 if (submitPending.nonEmpty) {
512622 submitPending.foreach { task =>
513- // println(s"task offerCycles:${task.offerCycles} ${maxOfferCycles}")
514623 if (task.offerCycles > maxOfferCycles) {
515624 log.warning(s " failing task ${task.reqs.taskId} after ${task.offerCycles} unmatching offer cycles " )
625+ tasks.remove(task.reqs.taskId)
516626 task.promise.failure(
517627 new CapacityFailure (
518628 task.reqs.mem.toFloat,
519629 task.reqs.cpus.toFloat,
520630 task.reqs.ports.size,
521631 remaining.values.toList))
522- tasks.remove(task.reqs.taskId)
523632 } else {
524633 tasks.update(task.reqs.taskId, task.copy(offerCycles = task.offerCycles + 1 )) // increase the offer cycles this task has seen
525634 }
@@ -530,7 +639,7 @@ trait MesosClientActor extends Actor with ActorLogging with MesosClientConnectio
530639 // store a reference of last memory offer (total) from each agent
531640 val newOfferStats = MesosClient .getOfferStats(config, role, agentOfferMap)
532641 // log the agents that previously had offers, but no longer have offers, and are not yet pruned
533- val diff = newOfferStats.keySet.diff(agentOfferMap.keySet)
642+ val diff = newOfferStats.keySet.diff(agentOfferMap.keySet.map(_.getHostname) )
534643 if (diff.nonEmpty) {
535644 log.info(s " some agents not included in offer cycle: ${diff} " )
536645 }
@@ -539,6 +648,8 @@ trait MesosClientActor extends Actor with ActorLogging with MesosClientConnectio
539648 // publish MesosAgentStats to subscribers
540649 listener.foreach(_ ! MesosAgentStats (agentOfferHistory))
541650 } else {
651+ // decline all offers
652+ declineOffers(event.getOffersList.asScala.map(_.getId))
542653 log.info(
543654 s " received ${event.getOffersList.size} offers; none usable for role $role; ${pending.size} pending tasks " )
544655 }
@@ -703,6 +814,12 @@ trait MesosClientActor extends Actor with ActorLogging with MesosClientConnectio
703814 case _ => // nothing
704815 }
705816 })
817+ // remove any held offers
818+ val rescinded = event.getRescind.getOfferId
819+ if (heldOffers.keySet.contains(rescinded)) {
820+ logger.info(s " removing held offer id ${rescinded.getValue} on rescind " )
821+ heldOffers = heldOffers - rescinded
822+ }
706823 case Event .Type .FAILURE => logger.warning(s " received failure message ${event.getFailure}" )
707824 case Event .Type .ERROR => logger.error(s " received error ${event.getError}" )
708825 case eventType => logger.warning(s " unhandled event ${toCompactJsonString(event)}" )
@@ -781,16 +898,19 @@ object MesosClient {
781898 .build
782899 def getOfferStats (config : MesosActorConfig ,
783900 role : String ,
784- agentOfferMap : Map [String , Map [String , Buffer [Resource ]]]) = {
901+ agentOfferMap : Map [Offer , Map [String , Buffer [Resource ]]]) = {
785902 // calculate expiration
786903 val expiration = Instant .now().plusSeconds(config.agentStatsTTL.toSeconds)
787904 // ports cannot be mapped to sum, need to calculate the size of ranges
788- agentOfferMap.mapValues { resources =>
789- AgentStats (
790- resources(" mem" ).map(_.getScalar.getValue).sum,
791- resources(" cpus" ).map(_.getScalar.getValue).sum,
792- countPorts(resources(" ports" )),
793- expiration)
905+ agentOfferMap.map { o =>
906+ val hostname = o._1.getHostname
907+ val resources = o._2
908+ hostname ->
909+ AgentStats (
910+ resources(" mem" ).map(_.getScalar.getValue).sum,
911+ resources(" cpus" ).map(_.getScalar.getValue).sum,
912+ countPorts(resources(" ports" )),
913+ expiration)
794914 }
795915 }
796916
0 commit comments