cyclescaler/master/main.go

638 lines
19 KiB
Go

package main
import (
"bytes"
"crypto/md5"
"crypto/sha256"
"encoding/json"
"fmt"
"html/template"
"image/png"
"io"
"log"
"net/http"
"os"
"os/signal"
"sort"
"strconv"
"strings"
"time"
"unicode"
"deadbeef.codes/steven/cyclescaler"
"github.com/disintegration/imaging"
)
// Wip (work in progress) is instanciated for each frame / task that a slave is currently working on. They are destroyed when a slave is finished or if we've lost contact with the slave.
type Wip struct {
FrameNumber int
HeartbeatTimestamp time.Time
Output string
Timeout bool // Haven't received a hearbeat and slated for cleanup
CompletePercentage int
SlaveName string // name of the slave processing this frame
BadHealth bool // If we're missing heartbeats
}
// BatchJob is instanciated for each .blend file uploaded and contains data related to the total rendering that needs to happen
// The BatchJob contains the state of all the frames in the batch job. A frame can be in 3 different states. It can
// be sitting in RemainingFrames, it can be in a Wip, or it can be in CompletedFrames. When it moves to one state,
// it is removed from the previous one. If a job times out, the watchdog handler will throw it back into the remaining
// frames state
// We use 3 slices for tracking state instead of keeping the state inside of a "frame" object because it scales better when
// searching for frames in a specific state
type BatchJob struct {
Token string // the token for the blend file
BlendFileName string // original blend file name
//Job state
Started bool // whether the job is paused or not yet started
Completed bool // whether the job is completed or not
CompletePercentage int //
//Frame state
RemainingFrames []int // these are "undispatched" frames. If it's in progress, it's not in here, doesn't mean it's rendered.
Wips []Wip // work in progress, frames which have been scheduled to be rendered by a slave. Contains heartbeat timestamp
CompletedFrames []int // frames which have been completed
NumberOfFrames int // Always equal to len(RemainingFrames) + len(Wips) + len (CompletedFrames)
StartTime string
}
// HomePage is data that gets served to templates/home
type HomePage struct {
NotNoJobs bool // This is purposely a dumb name. If there are any previously or ongoing jobs... AKA this is only false if this master has just come online and nothing has ever been uploaded to it. If it's true, the home page will display the segment listing ongoing jobs.
Jobs *map[string]*BatchJob
Token string // This is the token that's used if a new batch job were to be uploaded
}
// JobStatusPageImages is instantiated for each image on the status page which has a Thumbnail
// as well as a RenderedFramePath. This struct maps the two together.
type JobStatusPageImages struct {
ThumbnailPath string
RenderedFramePath string
}
// JobStatusPageData that is served to templates/jobStatus.html
type JobStatusPage struct {
Images []JobStatusPageImages
UseTinyImages bool //If we have more than 100 images
Job *BatchJob
CompleteFrames string
LatestImage string
InProgress bool
}
// Data that is served to templates/invalidMime.html
type InvalidMimePage struct {
UploadedFileMime string
}
var (
batchJobs map[string]*BatchJob // Uses session token as key
)
// http server setup and routing
func main() {
batchJobs = make(map[string]*BatchJob)
go watchdogTimer()
stop := make(chan os.Signal, 1)
signal.Notify(stop, os.Interrupt)
go func() {
http.Handle("/public/", http.StripPrefix("/public/", http.FileServer(http.Dir("public"))))
http.HandleFunc("/", homePageHandler)
http.HandleFunc("/settings", settingsPageHandler)
http.HandleFunc("/jobStatus", jobStatusHandler)
http.HandleFunc("/getwork", getWorkHandler)
http.HandleFunc("/postwork", postWorkHandler)
http.HandleFunc("/heartbeat", heartbeatHandler)
http.HandleFunc("/404", notFoundHandler)
log.Fatal(http.ListenAndServe(":8097", nil))
}()
log.Println("started cyclescaler on 8097")
<-stop
fmt.Println("Shutting server down...")
}
// Collect the .blend file from the user
// Display list of running jobs
func homePageHandler(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" {
timeNow := time.Now().Unix()
h := md5.New()
io.WriteString(h, strconv.FormatInt(timeNow, 10))
homePage := HomePage{}
fmt.Println(len(batchJobs))
if len(batchJobs) > 0 {
homePage.NotNoJobs = true
homePage.Jobs = &batchJobs
}
homePage.Token = fmt.Sprintf("%x", h.Sum(nil))
t, err := template.ParseFiles("templates/home.html")
if err != nil {
log.Fatalf("could not parse home.html template file: %v", err)
}
err = t.Execute(w, homePage)
if err != nil {
log.Fatalf("could not execute home.html template file: %v", err)
}
} else {
err := r.ParseMultipartForm(32 << 22)
if err != nil {
log.Printf("failed to parse form during file upload: %v", err)
return
}
token, ok := r.Form["token"]
if !ok {
log.Printf("couldn't GET token")
return
}
file, handler, err := r.FormFile("uploadfile")
if err != nil {
fmt.Println(err)
return
}
defer file.Close()
if handler.Header.Get("Content-Type") != "application/x-blender" && handler.Header.Get("Content-Type") != "application/octet-stream" {
t, err := template.ParseFiles("templates/invalidMime.html")
if err != nil {
log.Fatalf("could not parse invalidMime.html template file: %v", err)
}
invalidMimePage := &InvalidMimePage{}
invalidMimePage.UploadedFileMime = handler.Header.Get("Content-Type")
t.Execute(w, invalidMimePage)
return
}
http.Redirect(w, r, fmt.Sprintf("/settings?token=%s", r.Form["token"][0]), http.StatusSeeOther)
// Store the uploaded file in the public directory and run it through a SHA256 hash function
f, err := os.OpenFile(fmt.Sprintf("public/blendfiles/%s.blend", token[0]), os.O_WRONLY|os.O_CREATE, 0666)
if err != nil {
log.Printf("could not create /public/blendfiles/%s.blend: %v", token[0], err)
return
}
defer f.Close()
hash := sha256.New()
multiWriter := io.MultiWriter(f, hash)
if _, err := io.Copy(multiWriter, file); err != nil {
log.Printf("could not copy file to multiWriter (file output and hash engine): %v", err)
return
}
// Store the output of the hash function to a file in the public directory
hf, err := os.OpenFile(fmt.Sprintf("public/blendfiles/%s.sha256", token[0]), os.O_WRONLY|os.O_CREATE, 0666)
if err != nil {
log.Printf("coult not create /public/blendfiles/%s.sha256: %v", token[0], err)
return
}
hr := bytes.NewReader(hash.Sum(nil))
if _, err := io.Copy(hf, hr); err != nil {
log.Printf("could not copy hash of uploaded blend file to /public/blendfiles/%s.sha256: %v", token[0], err)
return
}
t := time.Now()
// Create the job and set known parameters
batchJob := BatchJob{
BlendFileName: handler.Filename,
Token: r.Form["token"][0],
StartTime: t.Format(time.RFC3339)}
batchJobs[r.Form["token"][0]] = &batchJob
}
}
// Collect settings from the user for the batch job
func settingsPageHandler(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" {
err := r.ParseForm()
if err != nil {
log.Printf("failed to parse form in settingsHandler GET: %v", err)
return
}
_, ok := r.Form["token"]
if !ok {
log.Println("couldn't get token in settingsHandler GET")
return
}
t, err := template.ParseFiles("templates/settings.html")
if err != nil {
log.Fatalf("could not parse settings.html template file: %v", err)
}
t.Execute(w, batchJobs[r.Form["token"][0]])
} else {
// Begin render and redirect person to status page for that render
err := r.ParseMultipartForm(32 << 20)
if err != nil {
log.Printf("failed to parse form in settingsHandler POST: %v", err)
return
}
token, ok := r.Form["token"]
if !ok {
log.Println("couldn't get token in settingsHandler POST")
return
}
var renderFrames string
_, ok = r.Form["renderframes"]
if ok {
renderFrames = stripSpaces(r.Form["renderframes"][0]) + "," //This is a hack to trigger the , case below to add the last set of numbers
}
log.Printf("renderFrames = %s", renderFrames)
//This is where we have to parse the input settings into job.RemainingFrames
var remainingFrames []int
var firstNumber int //Used for ranges of frames, such as 1-50 in the case matching below
firstNumber = -1
var currentNumber string
for _, char := range renderFrames {
switch char {
case ',':
frame, err := strconv.Atoi(currentNumber)
log.Printf("frame = %d", frame)
if err != nil {
log.Printf("error converting currentNumber '%s' to integer: %v", currentNumber, err)
}
if firstNumber == -1 {
remainingFrames = append(remainingFrames, frame)
} else {
for j := firstNumber; j < frame+1; j++ {
remainingFrames = append(remainingFrames, j)
}
}
firstNumber = -1
currentNumber = ""
case '-':
firstNumber, err = strconv.Atoi(currentNumber)
if err != nil {
log.Printf("error converting currentNumber '%s' to integer: %v", currentNumber, err)
}
currentNumber = ""
default:
currentNumber += string(char)
}
}
batchJobs[token[0]].RemainingFrames = remainingFrames
batchJobs[token[0]].NumberOfFrames = len(remainingFrames)
batchJobs[token[0]].Started = true
renderedFramesDir := fmt.Sprintf("public/renderedframes/%s", token[0])
renderedThumbnailsDir := fmt.Sprintf("public/renderedframes-thumbnail/%s", token[0])
if _, err := os.Stat(renderedFramesDir); os.IsNotExist(err) {
os.Mkdir(renderedFramesDir, os.ModePerm)
}
if _, err := os.Stat(renderedThumbnailsDir); os.IsNotExist(err) {
os.Mkdir(renderedThumbnailsDir, os.ModePerm)
}
http.Redirect(w, r, fmt.Sprintf("/jobStatus?token=%s", r.Form["token"][0]), http.StatusSeeOther)
}
}
// The status page to review status for the render job, see frame thumbnails and renders in progress
func jobStatusHandler(w http.ResponseWriter, r *http.Request) {
err := r.ParseForm()
if err != nil {
log.Printf("failed to parse form in statusHandler: %v", err)
return
}
_, ok := r.Form["token"]
if !ok {
log.Println("couldn't get token in statusHandler")
return
}
token := r.Form["token"][0]
_, ok = batchJobs[token]
if !ok {
log.Println("request to see status page with token that doesn't exist")
http.Redirect(w, r, "/404", http.StatusSeeOther)
return
}
t, err := template.ParseFiles("templates/jobStatus.html")
if err != nil {
log.Fatalf("could not parse jobStatus.html template file: %v", err)
}
jobStatusPage := JobStatusPage{}
jobStatusPage.Job = batchJobs[token]
f, err := os.Open(fmt.Sprintf("public/renderedframes-thumbnail/%s", token))
if err != nil {
log.Fatal(err)
}
files, err := f.Readdir(-1)
f.Close()
if err != nil {
log.Fatal(err)
}
for _, file := range files {
tnp := fmt.Sprintf("public/renderedframes-thumbnail/%s/%s", token, file.Name())
rfp := fmt.Sprintf("public/renderedframes/%s/%s", token, file.Name())
if strings.HasSuffix(tnp, ".png") {
spi := JobStatusPageImages{ThumbnailPath: tnp, RenderedFramePath: rfp}
jobStatusPage.Images = append(jobStatusPage.Images, spi)
}
}
sort.Slice(jobStatusPage.Images, func(i, j int) bool {
return jobStatusPage.Images[i].ThumbnailPath < jobStatusPage.Images[j].ThumbnailPath
})
if len(jobStatusPage.Images) > 0 {
jobStatusPage.LatestImage = jobStatusPage.Images[len(jobStatusPage.Images)-1].RenderedFramePath
} else {
jobStatusPage.LatestImage = "public/img/blankthumbnail.png"
}
if len(jobStatusPage.Images) > 500 {
jobStatusPage.UseTinyImages = true
}
if len(jobStatusPage.Job.Wips) > 0 {
jobStatusPage.InProgress = true
} else {
jobStatusPage.InProgress = false
}
t.Execute(w, jobStatusPage)
}
// A slave is requesting work - find them something to do
func getWorkHandler(w http.ResponseWriter, r *http.Request) {
err := r.ParseForm()
if err != nil {
log.Printf("failed to parse form in getWorkHandler: %v", err)
return
}
_, ok := r.Form["slavename"]
if !ok {
log.Println("couldn't get slavename getWorkHandler")
return
}
slaveName := r.Form["slavename"][0]
for _, job := range batchJobs {
if job.Completed || !job.Started { //Jobs in these states are skipped
continue
}
if len(job.RemainingFrames) > 0 {
slaveOrder := &cyclescaler.SlaveOrder{}
slaveOrder.Token = job.Token
slaveOrder.FrameNumber = job.RemainingFrames[0] // We grab the next frame to render
job.RemainingFrames = append(job.RemainingFrames[:0], job.RemainingFrames[1:]...) // Remove it from the remaining frames
wip := Wip{} //and then store it all in a WIP
wip.FrameNumber = slaveOrder.FrameNumber
wip.HeartbeatTimestamp = time.Now()
wip.Output = "Job dispatched to slave..."
wip.SlaveName = slaveName
job.Wips = append(job.Wips, wip)
if slaveOrder.Token != "" {
json, err := json.Marshal(slaveOrder)
if err != nil {
log.Printf("failed to marshal job '%s' to json: %v", job.BlendFileName, err)
return
}
_, err = w.Write(json)
if err != nil {
log.Printf("failed to write json to http response writer for job '%s': %v", job.BlendFileName, err)
return
}
return
}
}
if len(job.Wips) == 0 { //The job is now complete, but not yet marked as complete
job.Completed = true
}
}
w.Write([]byte("none")) // No work available, terminate the instance
}
// Accept a rendered png frame from a slave and save it to the public directory
func postWorkHandler(w http.ResponseWriter, r *http.Request) {
fmt.Println("received post request to upload work")
err := r.ParseMultipartForm(32 << 20)
if err != nil {
log.Printf("failed to parse multipart form in postWorkHandler")
return
}
token, ok := r.Form["token"]
if !ok {
log.Printf("couldn't get token for postWorkHandler")
return
}
_, ok = r.Form["framenumber"]
if !ok {
log.Printf("couldn't get framenumber for postWorkHandler")
return
}
frameNumber, err := strconv.Atoi(r.Form["framenumber"][0])
if err != nil {
log.Printf("failed to convert frame number '%s' to integer: %v", r.Form["framenumber"][0], err)
return
}
fmt.Printf("postWork framenumber '%d', token '%s'", frameNumber, token[0])
file, handler, err := r.FormFile("uploadfile")
if err != nil {
fmt.Println(err)
return
}
fmt.Println("1")
defer file.Close()
f, err := os.OpenFile(fmt.Sprintf("public/renderedframes/%s/%s", token[0], handler.Filename), os.O_WRONLY|os.O_CREATE, 0666)
if err != nil {
log.Printf("failed to create uploaded file locally: %v", err)
return
}
defer f.Close()
_, err = io.Copy(f, file)
if err != nil {
log.Printf("failed to save local image file from uploaded render frame '%d', token '%s': %v", frameNumber, token[0], err)
return
}
_, ok = batchJobs[token[0]]
if !ok {
log.Printf("received a rendered frame '%d' for job with token '%s' but couldn't find this as an active job. master may have been restarted while render was in progress", frameNumber, token[0])
return
}
fmt.Println("finished saving uploaded file")
job := batchJobs[token[0]]
job.CompletedFrames = append(job.CompletedFrames, frameNumber)
job.CompletePercentage = (len(job.CompletedFrames) * 100) / job.NumberOfFrames
for i, wip := range job.Wips {
if wip.FrameNumber == frameNumber {
job.Wips = append(job.Wips[:i], job.Wips[i+1:]...) //Remove it from the Wips
break
}
}
f.Close()
src, err := imaging.Open(fmt.Sprintf("public/renderedframes/%s/%s", token[0], handler.Filename))
if err != nil {
log.Fatalf("failed to decode uploaded file during thumbnail creation: %v", err)
}
dstImageFit := imaging.Fit(src, 240, 135, imaging.Lanczos)
f, err = os.Create(fmt.Sprintf("public/renderedframes-thumbnail/%s/%s", token[0], handler.Filename))
if err != nil {
panic(err)
}
defer f.Close()
png.Encode(f, dstImageFit)
}
// A slave is checking in to tell us he's still working hard. Also, provides an update on the current job.
func heartbeatHandler(w http.ResponseWriter, r *http.Request) {
fmt.Println("received post request for heartbeat")
err := r.ParseForm()
if err != nil {
log.Printf("failed to parse multipart form in heartbeatHandler")
return
}
token, ok := r.Form["token"]
if !ok {
log.Printf("couldn't get token for heartbeatHandler")
return
}
status, ok := r.Form["status"]
if !ok {
log.Printf("couldn't get status for heartbeatHandler")
}
_, ok = r.Form["framenumber"]
if !ok {
log.Printf("couldn't get framenumber for heartbeatHandler")
return
}
frameNumber, err := strconv.Atoi(r.Form["framenumber"][0])
if err != nil {
log.Printf("failed to convert frame number '%s' to integer: %v", r.Form["framenumber"][0], err)
return
}
_, ok = batchJobs[token[0]]
if !ok {
log.Printf("received heartbeat for batch job with token '%s' but it doesn't exist", token[0])
return
}
job := batchJobs[token[0]]
var progress int
if strings.Contains(status[0], "View Layer Rendered") {
progString := strings.Split(status[0], "View Layer Rendered ")[1]
progString = strings.Split(progString, " Tiles")[0]
tiles := strings.Split(progString, "/")
currentTile, err := strconv.Atoi(tiles[0])
if err != nil {
log.Printf("couldn't convert current tile '%s' to integer: %v", tiles[0], err)
}
endTile, err := strconv.Atoi(tiles[1])
if err != nil {
log.Printf("couldn't convert endTile '%s' to integer: %v", tiles[1], err)
}
log.Printf("currenttile: %d\nendTile: %d", currentTile, endTile)
progress = (currentTile * 100) / endTile
}
ok = false
for i, wip := range job.Wips {
if wip.FrameNumber == frameNumber {
log.Printf("received heartbeat for batch job '%s' - current status '%s' - current progress '%d'", token[0], status[0], progress)
ok = true
job.Wips[i].HeartbeatTimestamp = time.Now()
job.Wips[i].Output = status[0]
job.Wips[i].CompletePercentage = progress
break
}
}
if !ok {
log.Printf("received heartbeat for frame '%d' for batch job '%s' but the frame is not considered to be in progress?", frameNumber, token[0])
}
}
// Checks all wips for all jobs for the heartbeat timestamp timeout, and considers the slave down if it's exceeded
// the work then becomes available to be redispatched to the next slave that requests work.
func watchdogTimer() {
for {
time.Sleep(time.Minute * 1) // We run this every minute
for _, job := range batchJobs {
for i, wip := range job.Wips {
if time.Since(wip.HeartbeatTimestamp) > time.Minute*1 {
job.Wips[i].BadHealth = true
} else {
job.Wips[i].BadHealth = false
}
if time.Since(wip.HeartbeatTimestamp) > time.Minute*5 {
log.Printf("slave for frame '%d on job '%s' hasn't checked in for over 5 minutes... considering them dead", wip.FrameNumber, job.Token)
job.RemainingFrames = append([]int{wip.FrameNumber}, job.RemainingFrames...) //Throw it to the front of the queue
job.Wips = append(job.Wips[:i], job.Wips[i+1:]...)
break
}
}
}
}
}
func notFoundHandler(w http.ResponseWriter, r *http.Request) {
t, err := template.ParseFiles("templates/404.html")
if err != nil {
log.Fatalf("could not parse 404.html template file - you know shit's going wrong when...: %v", err)
}
t.Execute(w, nil)
}
//A little helper function, removes all whitespace from a string
func stripSpaces(str string) string {
return strings.Map(func(r rune) rune {
if unicode.IsSpace(r) {
// if the character is a space, drop it
return -1
}
// else keep it in the string
return r
}, str)
}