Commit 88a018bf authored by Mitchell Hashimoto's avatar Mitchell Hashimoto

packer: Work on communicators... WIP

parent a2bf964f
......@@ -60,9 +60,6 @@ func (b *Builder) Run(ui packer.Ui, hook packer.Hook) {
MaxCount: 0,
}
hook.Run(HookPreLaunch, nil, ui)
return
ui.Say("Launching a source AWS instance...\n")
runResp, err := ec2conn.RunInstances(runOpts)
if err != nil {
......
package packer
import "io"
// A Communicator is the interface used to communicate with the machine
// that exists that will eventually be packaged into an image. Communicators
// allow you to execute remote commands, upload files, etc.
//
// Communicators must be safe for concurrency, meaning multiple calls to
// Start or any other method may be called at the same time.
type Communicator interface {
Start(string) (*RemoteCommand, error)
Upload(string, io.Reader)
Download(string, io.Writer)
}
// This struct contains some information about the remote command being
// executed and can be used to wait for it to complete.
type RemoteCommand struct {
Stdin io.Writer
Stdout io.Reader
Stderr io.Reader
ExitStatus int
}
package rpc
import (
"errors"
"github.com/mitchellh/packer/packer"
"io"
"log"
"net"
"net/rpc"
)
// An implementation of packer.Communicator where the communicator is actually
// executed over an RPC connection.
type communicator struct {
client *rpc.Client
}
// CommunicatorServer wraps a packer.Communicator implementation and makes
// it exportable as part of a Golang RPC server.
type CommunicatorServer struct {
c packer.Communicator
}
// RemoteCommandServer wraps a packer.RemoteCommand struct and makes it
// exportable as part of a Golang RPC server.
type RemoteCommandServer struct {
rc *packer.RemoteCommand
}
type CommunicatorStartResponse struct {
StdinAddress string
StdoutAddress string
StderrAddress string
ExitStatusAddress string
}
func Communicator(client *rpc.Client) *communicator {
return &communicator{client}
}
func (c *communicator) Start(cmd string) (rc *packer.RemoteCommand, err error) {
var response CommunicatorStartResponse
err = c.client.Call("Communicator.Start", &cmd, &response)
if err != nil {
return
}
// Connect to the three streams that will handle stdin, stdout,
// and stderr and get net.Conns for them.
stdinC, err := net.Dial("tcp", response.StdinAddress)
if err != nil {
return
}
stdoutC, err := net.Dial("tcp", response.StdoutAddress)
if err != nil {
return
}
stderrC, err := net.Dial("tcp", response.StderrAddress)
if err != nil {
return
}
// Build the response object using the streams we created
rc = &packer.RemoteCommand{
stdinC,
stdoutC,
stderrC,
0,
}
return
}
func (c *CommunicatorServer) Start(cmd *string, reply *CommunicatorStartResponse) (err error) {
// Start executing the command.
command, err := c.c.Start(*cmd)
if err != nil {
return
}
// If we didn't get a proper command... that's not right.
if command == nil {
return errors.New("communicator returned nil remote command")
}
// Next, we need to take the stdin/stdout and start a listener
// for each because the client will connect to us via TCP and use
// that connection as the io.Reader or io.Writer. These exist for
// only a single connection that is persistent.
stdinL := netListenerInRange(portRangeMin, portRangeMax)
stdoutL := netListenerInRange(portRangeMin, portRangeMax)
stderrL := netListenerInRange(portRangeMin, portRangeMax)
go serveSingleCopy("stdin", stdinL, command.Stdin, nil)
go serveSingleCopy("stdout", stdoutL, nil, command.Stdout)
go serveSingleCopy("stderr", stderrL, nil, command.Stderr)
// For the exit status, we use a simple RPC Server that serves
// some of the RemoteComand methods.
server := rpc.NewServer()
//server.RegisterName("RemoteCommand", &RemoteCommandServer{command})
*reply = CommunicatorStartResponse{
stdinL.Addr().String(),
stdoutL.Addr().String(),
stderrL.Addr().String(),
serveSingleConn(server),
}
return
}
func serveSingleCopy(name string, l net.Listener, dst io.Writer, src io.Reader) {
defer l.Close()
conn, err := l.Accept()
if err != nil {
return
}
// The connection is the destination/source that is nil
if dst == nil {
dst = conn
} else {
src = conn
}
written, err := io.Copy(dst, src)
log.Printf("%d bytes written for '%s'", written, name)
if err != nil {
log.Printf("'%s' copy error: %s", name, err.Error())
}
}
package rpc
import (
"bufio"
"cgl.tideland.biz/asserts"
"github.com/mitchellh/packer/packer"
"io"
"net/rpc"
"testing"
)
type testCommunicator struct {
startCalled bool
startCmd string
startIn *io.PipeReader
startOut *io.PipeWriter
startErr *io.PipeWriter
}
func (t *testCommunicator) Start(cmd string) (*packer.RemoteCommand, error) {
t.startCalled = true
t.startCmd = cmd
var stdin *io.PipeWriter
var stdout, stderr *io.PipeReader
t.startIn, stdin = io.Pipe()
stdout, t.startOut = io.Pipe()
stderr, t.startErr = io.Pipe()
rc := &packer.RemoteCommand{
stdin,
stdout,
stderr,
0,
}
return rc, nil
}
func (t *testCommunicator) Upload(string, io.Reader) {}
func (t *testCommunicator) Download(string, io.Writer) {}
func TestCommunicatorRPC(t *testing.T) {
assert := asserts.NewTestingAsserts(t, true)
// Create the interface to test
c := new(testCommunicator)
// Start the server
server := rpc.NewServer()
RegisterCommunicator(server, c)
address := serveSingleConn(server)
// Create the client over RPC and run some methods to verify it works
client, err := rpc.Dial("tcp", address)
assert.Nil(err, "should be able to connect")
// Test Start
remote := Communicator(client)
rc, err := remote.Start("foo")
assert.Nil(err, "should not have an error")
// Test that we can read from stdout
bufOut := bufio.NewReader(rc.Stdout)
c.startOut.Write([]byte("outfoo\n"))
data, err := bufOut.ReadString('\n')
assert.Nil(err, "should have no problem reading stdout")
assert.Equal(data, "outfoo\n", "should be correct stdout")
// Test that we can read from stderr
bufErr := bufio.NewReader(rc.Stderr)
c.startErr.Write([]byte("errfoo\n"))
data, err = bufErr.ReadString('\n')
assert.Nil(err, "should have no problem reading stdout")
assert.Equal(data, "errfoo\n", "should be correct stdout")
// Test that we can write to stdin
bufIn := bufio.NewReader(c.startIn)
rc.Stdin.Write([]byte("infoo\n"))
data, err = bufIn.ReadString('\n')
assert.Nil(err, "should have no problem reading stdin")
assert.Equal(data, "infoo\n", "should be correct stdin")
}
func TestCommunicator_ImplementsCommunicator(t *testing.T) {
//assert := asserts.NewTestingAsserts(t, true)
//var r packer.Communicator
//c := Communicator(nil)
//assert.Implementor(c, &r, "should be a Communicator")
}
......@@ -23,6 +23,12 @@ func RegisterCommand(s *rpc.Server, c packer.Command) {
s.RegisterName("Command", &CommandServer{c})
}
// Registers the appropriate endpoint on an RPC server to serve a
// Packer Communicator.
func RegisterCommunicator(s *rpc.Server, c packer.Communicator) {
s.RegisterName("Communicator", &CommunicatorServer{c})
}
// Registers the appropriate endpoint on an RPC server to serve a
// Packer Environment
func RegisterEnvironment(s *rpc.Server, e packer.Environment) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment