You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

344 lines
9.2 KiB

package main
import (
"context"
"errors"
"fmt"
"github.com/docker/go-plugins-helpers/volume"
"github.com/ramr/go-reaper"
log "github.com/sirupsen/logrus"
"io/ioutil"
"os"
"os/exec"
"path"
"strconv"
"strings"
"syscall"
"time"
)
const socketAddress = "/run/docker/plugins/lizardfs310.sock"
const containerVolumePath = "/mnt/docker-volumes"
const hostVolumePath = "/mnt/docker-volumes"
const volumeRoot = "/mnt/lizardfs/"
var host = os.Getenv("HOST")
var port = os.Getenv("PORT")
var remotePath = os.Getenv("REMOTE_PATH")
var mountOptions = os.Getenv("MOUNT_OPTIONS")
var rootVolumeName = os.Getenv("ROOT_VOLUME_NAME")
var connectTimeoutStr = os.Getenv("CONNECT_TIMEOUT")
var connectTimeout = 3000
var mounted = make(map[string][]string)
type lizardfsVolume struct {
Name string
Goal int
Path string
}
type lizardfsDriver struct {
volumes map[string]*lizardfsVolume
statePath string
}
func (l lizardfsDriver) Create(request *volume.CreateRequest) error {
log.WithField("method", "create").Debugf("%#v", l)
volumeName := request.Name
volumePath := fmt.Sprintf("%s%s", volumeRoot, volumeName)
replicationGoal := request.Options["ReplicationGoal"]
if volumeName == rootVolumeName {
log.Warning("tried to create a volume with same name as root volume. Ignoring request.")
}
errs := make(chan error, 1)
go func() {
err := os.MkdirAll(volumePath, 760)
errs <- err
}()
select {
case err := <-errs:
if err != nil {
return err
}
case <-time.After(time.Duration(connectTimeout) * time.Millisecond):
return errors.New("create operation timeout")
}
_, err := strconv.Atoi(replicationGoal)
if err == nil {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(connectTimeout)*time.Millisecond)
defer cancel()
cmd := exec.CommandContext(ctx, "lizardfs", "setgoal", "-r", replicationGoal, volumePath)
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true, Pgid: 1}
err = cmd.Start()
if err != nil {
return err
}
err = cmd.Wait()
if err != nil {
log.Error(err)
}
}
return nil
}
func (l lizardfsDriver) List() (*volume.ListResponse, error) {
log.WithField("method", "list").Debugf("")
volumes := make(chan []*volume.Volume, 1)
errs := make(chan error, 1)
go func() {
var vols []*volume.Volume
directories, err := ioutil.ReadDir(volumeRoot)
if err != nil {
errs <- err
}
for _, directory := range directories {
if len(mounted[directory.Name()]) == 0 {
vols = append(vols, &volume.Volume{Name: directory.Name()})
} else {
vols = append(vols, &volume.Volume{Name: directory.Name(), Mountpoint: path.Join(hostVolumePath, directory.Name())})
}
}
if rootVolumeName != "" {
if len(mounted[rootVolumeName]) == 0 {
vols = append(vols, &volume.Volume{Name: rootVolumeName})
} else {
vols = append(vols, &volume.Volume{Name: rootVolumeName, Mountpoint: path.Join(hostVolumePath, rootVolumeName)})
}
}
volumes <- vols
}()
select {
case res := <-volumes:
return &volume.ListResponse{Volumes: res}, nil
case err := <-errs:
return nil, err
case <-time.After(time.Duration(connectTimeout) * time.Millisecond):
return nil, errors.New("list operation timeout")
}
}
func (l lizardfsDriver) Get(request *volume.GetRequest) (*volume.GetResponse, error) {
log.WithField("method", "get").Debugf("")
volumeName := request.Name
volumePath := volumeRoot
if volumeName != rootVolumeName {
volumePath = fmt.Sprintf("%s%s", volumeRoot, volumeName)
}
errs := make(chan error, 1)
go func() {
if _, err := os.Stat(volumePath); os.IsNotExist(err) {
errs <- err
} else {
errs <- nil
}
}()
select {
case err := <-errs:
if err != nil {
return nil, err
} else {
return &volume.GetResponse{Volume: &volume.Volume{Name: volumeName, Mountpoint: volumePath}}, nil
}
case <-time.After(time.Duration(connectTimeout) * time.Millisecond):
return nil, errors.New("get operation timeout")
}
}
func (l lizardfsDriver) Remove(request *volume.RemoveRequest) error {
log.WithField("method", "remove").Debugf("")
volumeName := request.Name
volumePath := fmt.Sprintf("%s%s", volumeRoot, volumeName)
if volumeName == rootVolumeName {
return fmt.Errorf("can't remove root volume %s", rootVolumeName)
}
err := os.RemoveAll(volumePath)
return err
}
func (l lizardfsDriver) Path(request *volume.PathRequest) (*volume.PathResponse, error) {
log.WithField("method", "path").Debugf("")
var volumeName = request.Name
var hostMountpoint = path.Join(hostVolumePath, volumeName)
if len(mounted[volumeName]) == 0 {
return &volume.PathResponse{Mountpoint: hostMountpoint}, nil
}
return &volume.PathResponse{}, nil
}
func (l lizardfsDriver) Mount(request *volume.MountRequest) (*volume.MountResponse, error) {
log.WithField("method", "mount").Debugf("")
var volumeName = request.Name
var mountID = request.ID
var containerMountpoint = path.Join(containerVolumePath, volumeName)
var hostMountpoint = path.Join(hostVolumePath, volumeName)
if len(mounted[volumeName]) == 0 {
err := os.MkdirAll(containerMountpoint, 760)
if err != nil && err != os.ErrExist {
return nil, err
}
mountRemotePath := remotePath
if volumeName != rootVolumeName {
mountRemotePath = path.Join(remotePath, volumeName)
}
params := []string{ "-o", "mfsmaster="+host, "-o", "mfsport="+port, "-o", "mfssubfolder="+mountRemotePath}
if mountOptions != "" {
params = append(params, strings.Split(mountOptions, " ")...)
}
params = append(params, []string{containerMountpoint}...)
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(connectTimeout)*time.Millisecond)
defer cancel()
cmd := exec.CommandContext(ctx, "mfsmount", params...)
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true, Pgid: 1}
err = cmd.Start()
if err != nil {
return nil, err
}
err = cmd.Wait()
if err != nil {
log.Error(err)
}
mounted[volumeName] = append(mounted[volumeName], mountID)
return &volume.MountResponse{Mountpoint: hostMountpoint}, nil
} else {
return &volume.MountResponse{Mountpoint: hostMountpoint}, nil
}
}
func indexOf(word string, data []string) int {
for k, v := range data {
if word == v {
return k
}
}
return -1
}
func (l lizardfsDriver) Unmount(request *volume.UnmountRequest) error {
log.WithField("method", "unmount").Debugf("")
var volumeName = request.Name
var mountID = request.ID
var containerMountpoint = path.Join(containerVolumePath, volumeName)
index := indexOf(mountID, mounted[volumeName])
if index > -1 {
mounted[volumeName] = append(mounted[volumeName][:index], mounted[volumeName][index+1:]...)
}
if len(mounted[volumeName]) == 0 {
output, err := exec.Command("umount", containerMountpoint).CombinedOutput()
if err != nil {
log.Error(string(output))
return err
}
log.Debug(string(output))
return nil
}
return nil
}
func (l lizardfsDriver) Capabilities() *volume.CapabilitiesResponse {
log.WithField("method", "capabilities").Debugf("")
return &volume.CapabilitiesResponse{Capabilities: volume.Capability{Scope: "global"}}
}
func newLizardfsDriver(root string) (*lizardfsDriver, error) {
log.WithField("method", "new driver").Debug(root)
d := &lizardfsDriver{
volumes: map[string]*lizardfsVolume{},
}
return d, nil
}
func initClient() {
log.WithField("host", host).WithField("port", port).WithField("remote path", remotePath).Info("initializing client")
err := os.MkdirAll(volumeRoot, 760)
if err != nil {
log.Error(err)
}
params := []string{"-o", "mfsmaster="+host, "-o", "mfsport="+port, "-o", "mfssubfolder="+remotePath}
if mountOptions != "" {
params = append(params, strings.Split(mountOptions, " ")...)
}
params = append(params, []string{volumeRoot}...)
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(connectTimeout)*time.Millisecond)
defer cancel()
output, err := exec.CommandContext(ctx, "mfsmount", params...).CombinedOutput()
if err != nil {
log.Error(string(output))
log.Fatal(err)
}
log.Debug(string(output))
}
func startReaperWorker() {
// See related issue in go-reaper https://github.com/ramr/go-reaper/issues/11
if _, hasReaper := os.LookupEnv("REAPER"); !hasReaper {
go reaper.Reap()
args := append(os.Args, "#worker")
pwd, err := os.Getwd()
if err != nil {
panic(err)
}
workerEnv := []string{fmt.Sprintf("REAPER=%d", os.Getpid())}
var wstatus syscall.WaitStatus
pattrs := &syscall.ProcAttr{
Dir: pwd,
Env: append(os.Environ(), workerEnv...),
Sys: &syscall.SysProcAttr{Setsid: true},
Files: []uintptr{0, 1, 2},
}
workerPid, _ := syscall.ForkExec(args[0], args, pattrs)
_, err = syscall.Wait4(workerPid, &wstatus, 0, nil)
for syscall.EINTR == err {
_, err = syscall.Wait4(workerPid, &wstatus, 0, nil)
}
}
}
func main() {
logLevel, err := log.ParseLevel(os.Getenv("LOG_LEVEL"))
if err != nil {
log.SetLevel(log.InfoLevel)
} else {
log.SetLevel(logLevel)
}
log.Debugf("log level set to %s", log.GetLevel())
startReaperWorker()
connectTimeout, err = strconv.Atoi(connectTimeoutStr)
if err != nil {
log.Errorf("failed to parse timeout with error %v. Assuming default %v", err, connectTimeout)
}
initClient()
d, err := newLizardfsDriver("/mnt")
if err != nil {
log.Fatal(err)
}
h := volume.NewHandler(d)
log.Infof("listening on %s", socketAddress)
log.Error(h.ServeUnix(socketAddress, 0))
}