cyclescaler/slave/main.go

396 lines
10 KiB
Go

package main
import (
"bufio"
"bytes"
"crypto/sha256"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"mime/multipart"
"net/http"
"net/url"
"os"
"os/exec"
"strconv"
"strings"
"time"
"deadbeef.codes/steven/cyclescaler"
)
// Status contains data parsed from the blender stdoutput, this gets passed to the master in the heartbeat sent every 30 seconds
type Status struct {
Frame string
Mem string
PeakMem string
Output string
}
var (
status *Status
masterURL string
slaveName string
blenderPath string
so *cyclescaler.SlaveOrder
)
// environment configuration
func init() {
if len(os.Getenv("csmasterurl")) == 0 {
log.Fatalf("environment variable csmasterurl is not set")
}
if len(os.Getenv("csslavename")) == 0 {
log.Fatalf("environment variable csslavename is not set")
}
masterURL = os.Getenv("csmasterurl")
slaveName = os.Getenv("csslavename")
// Path to the blender executable. This depends on the container filesystem.
// Because we're T H I C C and using Ubuntu right now (containers aren't VMs!) we use a reasonable path.
blenderPath = "/usr/local/blender/blender"
status = &Status{}
so = &cyclescaler.SlaveOrder{}
status.Output = "initializing slave"
log.Print("starting cyclescaler slave...")
log.Printf("hi, my name is: %s", slaveName)
log.Printf("my master is located at: %s", masterURL)
}
// Main program loop
func main() {
go heartBeat()
for {
var err error
log.Print("contacting master for next orders...")
status.Output = "contacting master for next orders..."
so, err = getWork()
if err != nil {
log.Printf("couldn't get work: %v", err)
time.Sleep(time.Second * 25) // We literally just sleep for 25 seconds before asking for work again. TBD: pubsub
continue
}
log.Print("caching .blend file locally...")
status.Output = "retrieving .blend file from master..."
err = getBlendFile(so.Token)
if err != nil {
log.Printf("couldn't get blend file for work: %v", err)
time.Sleep(time.Minute * 5)
continue
}
log.Print("Ready to render, initializing blender...")
status.Output = "Initializing blender..."
err = render()
if err != nil {
log.Printf("rendering token '%s', frame '%d' failed: %v", so.Token, so.FrameNumber, err)
time.Sleep(time.Minute * 5)
continue
}
log.Print("Posting work to master...")
status.Output = "Uploading rendered frame to master..."
err = postWork()
if err != nil {
log.Printf("failed to post frame '%d' for token '%s': %v", so.FrameNumber, so.Token, err)
continue
}
}
}
// Sends heartbeat to master every 15 seconds to provide a status update
func heartBeat() {
var missedHeartbeats int
for {
time.Sleep(time.Second * 15)
if so == nil {
continue
}
if (*so).Token == "" {
continue
}
v := url.Values{}
v.Add("token", so.Token)
v.Add("framenumber", strconv.Itoa(so.FrameNumber))
v.Add("status", status.Output)
reqURL := fmt.Sprintf("%s/heartbeat?%s", masterURL, v.Encode())
log.Printf("sending heartbeat - %s", reqURL)
resp, err := http.Get(reqURL)
if err != nil {
log.Printf("http GET error: %v", err)
continue
}
if resp.StatusCode != 200 {
missedHeartbeats++
log.Printf("heartbeat response HTTP status code is '%s', expected 200, master offline?", resp.Status)
log.Printf("total consecutive missed heartbeats: %d", missedHeartbeats)
log.Printf("rendering will continue and work will be uploaded once the master is available")
continue
}
missedHeartbeats = 0
}
}
// contacts master asking for and receiving next orders
func getWork() (*cyclescaler.SlaveOrder, error) {
v := url.Values{}
v.Add("slavename", slaveName)
resp, err := http.Get(fmt.Sprintf("%s/getwork?%s", masterURL, v.Encode()))
if err != nil {
return nil, fmt.Errorf("http get request to master failed: %v", err)
}
so := &cyclescaler.SlaveOrder{}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read response body: %v", err)
}
if string(body) == "none" {
return nil, fmt.Errorf("no work available")
}
err = json.Unmarshal(body, so)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal response body to cyclescaler.SlaveOrder{}: %v", err)
}
return so, nil
}
// If it doesn't exist, downloads the hash and blend file.
// Verifies hash of current file with what master has provided
func getBlendFile(token string) error {
blendFileName := fmt.Sprintf("%s.blend", token)
hashFileName := fmt.Sprintf("%s.sha256", token)
hash := sha256.New()
if _, err := os.Stat(hashFileName); os.IsNotExist(err) { //if it doesn't exist, download it
out, err := os.Create(hashFileName)
if err != nil {
return fmt.Errorf("failed to create file '%s': %v", hashFileName, err)
}
defer out.Close()
resp, err := http.Get(fmt.Sprintf("%s/public/blendfiles/%s.sha256", masterURL, token))
if err != nil {
return fmt.Errorf("failed to download hash for blend file from master: %v", err)
}
defer resp.Body.Close()
_, err = io.Copy(out, resp.Body)
if err != nil {
return fmt.Errorf("failed to save downloaded hash for blend file to local file: %v", err)
}
}
remoteHash, err := ioutil.ReadFile(hashFileName)
if err != nil {
return fmt.Errorf("failed to read file '%s': %v", hashFileName, err)
}
if _, err := os.Stat(blendFileName); os.IsNotExist(err) { //if it doesn't exist, download it
out, err := os.Create(blendFileName)
if err != nil {
return fmt.Errorf("failed to create file '%s': %v", blendFileName, err)
}
defer out.Close()
resp, err := http.Get(fmt.Sprintf("%s/public/blendfiles/%s.blend", masterURL, token))
if err != nil {
return fmt.Errorf("failed to download blend file from master: %v", err)
}
defer resp.Body.Close()
multiWriter := io.MultiWriter(out, hash)
_, err = io.Copy(multiWriter, resp.Body)
if err != nil {
return fmt.Errorf("failed to write .blend file to multiwriter (local file and hash engine): %v", err)
}
} else {
blendFile, err := os.Open(blendFileName)
if err != nil {
return fmt.Errorf("failed to read file '%s': %v", blendFileName, err)
}
defer blendFile.Close()
if _, err := io.Copy(hash, blendFile); err != nil {
return fmt.Errorf("failed to copy blend file to hash engine: %v", err)
}
}
localHash := hash.Sum(nil)
if bytes.Compare(localHash, remoteHash) != 0 {
os.Remove(blendFileName)
os.Remove(hashFileName)
return fmt.Errorf("hash of downloaded blend file does not equal hash provided by master")
}
return nil
}
// runs blender with appropriate arguments provided by slave order
func render() error {
//Required parameter
fileName := fmt.Sprintf("%s.blend", so.Token)
cmdArgs := []string{"-b",
fileName,
"-F",
"PNG",
"-o",
fmt.Sprintf("%s_#####", so.Token),
"-f",
strconv.Itoa(so.FrameNumber)}
cmd := exec.Command(blenderPath, cmdArgs...)
cmdReader, err := cmd.StdoutPipe()
if err != nil {
return fmt.Errorf("%v: error creating stdout pipe for blender: %v", os.Stderr, err)
}
scanner := bufio.NewScanner(cmdReader)
go func() {
for scanner.Scan() {
outputFields := strings.Split(scanner.Text(), " | ")
//0 = frame/mem
//1 = time
//2 = mem
//3 = scene
//4 = output
if strings.HasPrefix(outputFields[0], "Fra:") {
frameMem := strings.Split(strings.TrimPrefix(outputFields[0], "Fra:"), " ")
status.Frame = frameMem[0]
status.Mem = frameMem[1]
status.PeakMem = frameMem[4]
if len(outputFields) > 4 {
status.Output = fmt.Sprintf("%v", outputFields[4:])
}
}
fmt.Printf("Frame: %s | Status: %s\n", status.Frame, status.Output)
fmt.Println(scanner.Text())
}
}()
err = cmd.Start()
if err != nil {
return fmt.Errorf("%v: error starting blender: %v", os.Stderr, err)
}
err = cmd.Wait()
if err != nil {
return fmt.Errorf("%v error waiting for blender to finish: %v", os.Stderr, err)
}
if status.Frame == "" { //Reached last frame
_, err := http.Get(fmt.Sprintf("%s/setfinalframe?token=%s&finalframe=%d", masterURL, so.Token, so.FrameNumber-1))
if err != nil {
return fmt.Errorf("failed to inform master last frame was reached: %v", err)
}
}
return nil
}
// validates work and the uploads it to master
func postWork() error {
pngFileName := ""
dir, err := os.Open(".")
if err != nil {
return fmt.Errorf("failed to open current directory: %v", err)
}
defer dir.Close()
list, _ := dir.Readdirnames(0)
for _, name := range list {
if strings.Contains(name, ".png") {
pngFileName = name
}
}
if pngFileName == "" {
return fmt.Errorf("failed to local .png file in local directory - be sure .blend is not configured to save render outside of './'")
}
bodyBuf := &bytes.Buffer{}
bodyWriter := multipart.NewWriter(bodyBuf)
// this step is very important
fileWriter, err := bodyWriter.CreateFormFile("uploadfile", pngFileName)
if err != nil {
return fmt.Errorf("failed creating form writer: %v", err)
}
// open file handle
fh, err := os.Open(pngFileName)
if err != nil {
return fmt.Errorf("error opening file '%s' : %v", pngFileName, err)
}
defer fh.Close()
//iocopy
_, err = io.Copy(fileWriter, fh)
if err != nil {
return fmt.Errorf("failed to copy png file '%s' to form file writer: %v", pngFileName, err)
}
contentType := bodyWriter.FormDataContentType()
tokenWriter, err := bodyWriter.CreateFormField("token")
if err != nil {
return fmt.Errorf("failed creating form writer: %v", err)
}
tokenWriter.Write([]byte(so.Token))
frameNumberWriter, err := bodyWriter.CreateFormField("framenumber")
if err != nil {
return fmt.Errorf("failed creating form writer: %v", err)
}
frameNumberWriter.Write([]byte(strconv.Itoa(so.FrameNumber)))
bodyWriter.Close()
backoffMultiplier := 1
for {
resp, err := http.Post(fmt.Sprintf("%s/postwork", masterURL), contentType, bodyBuf)
if err != nil {
return fmt.Errorf("http post error: %v", err)
}
if resp.StatusCode == 200 {
break
}
backoff := 30 * backoffMultiplier
log.Printf("failed to post work to master: received response http status '%s', expected 200", resp.Status)
log.Printf("Retrying in %d seconds...", backoff)
time.Sleep(time.Second * time.Duration(backoff))
if backoffMultiplier < 60 {
backoffMultiplier++
}
}
err = os.Remove(pngFileName)
if err != nil {
return fmt.Errorf("failed remove png file after upload: %v", err)
}
return nil
}