You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

344 lines
9.2 KiB

3 years ago
  1. package main
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "github.com/docker/go-plugins-helpers/volume"
  7. "github.com/ramr/go-reaper"
  8. log "github.com/sirupsen/logrus"
  9. "io/ioutil"
  10. "os"
  11. "os/exec"
  12. "path"
  13. "strconv"
  14. "strings"
  15. "syscall"
  16. "time"
  17. )
  18. const socketAddress = "/run/docker/plugins/lizardfs310.sock"
  19. const containerVolumePath = "/mnt/docker-volumes"
  20. const hostVolumePath = "/mnt/docker-volumes"
  21. const volumeRoot = "/mnt/lizardfs/"
  22. var host = os.Getenv("HOST")
  23. var port = os.Getenv("PORT")
  24. var remotePath = os.Getenv("REMOTE_PATH")
  25. var mountOptions = os.Getenv("MOUNT_OPTIONS")
  26. var rootVolumeName = os.Getenv("ROOT_VOLUME_NAME")
  27. var connectTimeoutStr = os.Getenv("CONNECT_TIMEOUT")
  28. var connectTimeout = 3000
  29. var mounted = make(map[string][]string)
  30. type lizardfsVolume struct {
  31. Name string
  32. Goal int
  33. Path string
  34. }
  35. type lizardfsDriver struct {
  36. volumes map[string]*lizardfsVolume
  37. statePath string
  38. }
  39. func (l lizardfsDriver) Create(request *volume.CreateRequest) error {
  40. log.WithField("method", "create").Debugf("%#v", l)
  41. volumeName := request.Name
  42. volumePath := fmt.Sprintf("%s%s", volumeRoot, volumeName)
  43. replicationGoal := request.Options["ReplicationGoal"]
  44. if volumeName == rootVolumeName {
  45. log.Warning("tried to create a volume with same name as root volume. Ignoring request.")
  46. }
  47. errs := make(chan error, 1)
  48. go func() {
  49. err := os.MkdirAll(volumePath, 760)
  50. errs <- err
  51. }()
  52. select {
  53. case err := <-errs:
  54. if err != nil {
  55. return err
  56. }
  57. case <-time.After(time.Duration(connectTimeout) * time.Millisecond):
  58. return errors.New("create operation timeout")
  59. }
  60. _, err := strconv.Atoi(replicationGoal)
  61. if err == nil {
  62. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(connectTimeout)*time.Millisecond)
  63. defer cancel()
  64. cmd := exec.CommandContext(ctx, "lizardfs", "setgoal", "-r", replicationGoal, volumePath)
  65. cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true, Pgid: 1}
  66. err = cmd.Start()
  67. if err != nil {
  68. return err
  69. }
  70. err = cmd.Wait()
  71. if err != nil {
  72. log.Error(err)
  73. }
  74. }
  75. return nil
  76. }
  77. func (l lizardfsDriver) List() (*volume.ListResponse, error) {
  78. log.WithField("method", "list").Debugf("")
  79. volumes := make(chan []*volume.Volume, 1)
  80. errs := make(chan error, 1)
  81. go func() {
  82. var vols []*volume.Volume
  83. directories, err := ioutil.ReadDir(volumeRoot)
  84. if err != nil {
  85. errs <- err
  86. }
  87. for _, directory := range directories {
  88. if len(mounted[directory.Name()]) == 0 {
  89. vols = append(vols, &volume.Volume{Name: directory.Name()})
  90. } else {
  91. vols = append(vols, &volume.Volume{Name: directory.Name(), Mountpoint: path.Join(hostVolumePath, directory.Name())})
  92. }
  93. }
  94. if rootVolumeName != "" {
  95. if len(mounted[rootVolumeName]) == 0 {
  96. vols = append(vols, &volume.Volume{Name: rootVolumeName})
  97. } else {
  98. vols = append(vols, &volume.Volume{Name: rootVolumeName, Mountpoint: path.Join(hostVolumePath, rootVolumeName)})
  99. }
  100. }
  101. volumes <- vols
  102. }()
  103. select {
  104. case res := <-volumes:
  105. return &volume.ListResponse{Volumes: res}, nil
  106. case err := <-errs:
  107. return nil, err
  108. case <-time.After(time.Duration(connectTimeout) * time.Millisecond):
  109. return nil, errors.New("list operation timeout")
  110. }
  111. }
  112. func (l lizardfsDriver) Get(request *volume.GetRequest) (*volume.GetResponse, error) {
  113. log.WithField("method", "get").Debugf("")
  114. volumeName := request.Name
  115. volumePath := volumeRoot
  116. if volumeName != rootVolumeName {
  117. volumePath = fmt.Sprintf("%s%s", volumeRoot, volumeName)
  118. }
  119. errs := make(chan error, 1)
  120. go func() {
  121. if _, err := os.Stat(volumePath); os.IsNotExist(err) {
  122. errs <- err
  123. } else {
  124. errs <- nil
  125. }
  126. }()
  127. select {
  128. case err := <-errs:
  129. if err != nil {
  130. return nil, err
  131. } else {
  132. return &volume.GetResponse{Volume: &volume.Volume{Name: volumeName, Mountpoint: volumePath}}, nil
  133. }
  134. case <-time.After(time.Duration(connectTimeout) * time.Millisecond):
  135. return nil, errors.New("get operation timeout")
  136. }
  137. }
  138. func (l lizardfsDriver) Remove(request *volume.RemoveRequest) error {
  139. log.WithField("method", "remove").Debugf("")
  140. volumeName := request.Name
  141. volumePath := fmt.Sprintf("%s%s", volumeRoot, volumeName)
  142. if volumeName == rootVolumeName {
  143. return fmt.Errorf("can't remove root volume %s", rootVolumeName)
  144. }
  145. err := os.RemoveAll(volumePath)
  146. return err
  147. }
  148. func (l lizardfsDriver) Path(request *volume.PathRequest) (*volume.PathResponse, error) {
  149. log.WithField("method", "path").Debugf("")
  150. var volumeName = request.Name
  151. var hostMountpoint = path.Join(hostVolumePath, volumeName)
  152. if len(mounted[volumeName]) == 0 {
  153. return &volume.PathResponse{Mountpoint: hostMountpoint}, nil
  154. }
  155. return &volume.PathResponse{}, nil
  156. }
  157. func (l lizardfsDriver) Mount(request *volume.MountRequest) (*volume.MountResponse, error) {
  158. log.WithField("method", "mount").Debugf("")
  159. var volumeName = request.Name
  160. var mountID = request.ID
  161. var containerMountpoint = path.Join(containerVolumePath, volumeName)
  162. var hostMountpoint = path.Join(hostVolumePath, volumeName)
  163. if len(mounted[volumeName]) == 0 {
  164. err := os.MkdirAll(containerMountpoint, 760)
  165. if err != nil && err != os.ErrExist {
  166. return nil, err
  167. }
  168. mountRemotePath := remotePath
  169. if volumeName != rootVolumeName {
  170. mountRemotePath = path.Join(remotePath, volumeName)
  171. }
  172. params := []string{ "-o", "mfsmaster="+host, "-o", "mfsport="+port, "-o", "mfssubfolder="+mountRemotePath}
  173. if mountOptions != "" {
  174. params = append(params, strings.Split(mountOptions, " ")...)
  175. }
  176. params = append(params, []string{containerMountpoint}...)
  177. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(connectTimeout)*time.Millisecond)
  178. defer cancel()
  179. cmd := exec.CommandContext(ctx, "mfsmount", params...)
  180. cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true, Pgid: 1}
  181. err = cmd.Start()
  182. if err != nil {
  183. return nil, err
  184. }
  185. err = cmd.Wait()
  186. if err != nil {
  187. log.Error(err)
  188. }
  189. mounted[volumeName] = append(mounted[volumeName], mountID)
  190. return &volume.MountResponse{Mountpoint: hostMountpoint}, nil
  191. } else {
  192. return &volume.MountResponse{Mountpoint: hostMountpoint}, nil
  193. }
  194. }
  195. func indexOf(word string, data []string) int {
  196. for k, v := range data {
  197. if word == v {
  198. return k
  199. }
  200. }
  201. return -1
  202. }
  203. func (l lizardfsDriver) Unmount(request *volume.UnmountRequest) error {
  204. log.WithField("method", "unmount").Debugf("")
  205. var volumeName = request.Name
  206. var mountID = request.ID
  207. var containerMountpoint = path.Join(containerVolumePath, volumeName)
  208. index := indexOf(mountID, mounted[volumeName])
  209. if index > -1 {
  210. mounted[volumeName] = append(mounted[volumeName][:index], mounted[volumeName][index+1:]...)
  211. }
  212. if len(mounted[volumeName]) == 0 {
  213. output, err := exec.Command("umount", containerMountpoint).CombinedOutput()
  214. if err != nil {
  215. log.Error(string(output))
  216. return err
  217. }
  218. log.Debug(string(output))
  219. return nil
  220. }
  221. return nil
  222. }
  223. func (l lizardfsDriver) Capabilities() *volume.CapabilitiesResponse {
  224. log.WithField("method", "capabilities").Debugf("")
  225. return &volume.CapabilitiesResponse{Capabilities: volume.Capability{Scope: "global"}}
  226. }
  227. func newLizardfsDriver(root string) (*lizardfsDriver, error) {
  228. log.WithField("method", "new driver").Debug(root)
  229. d := &lizardfsDriver{
  230. volumes: map[string]*lizardfsVolume{},
  231. }
  232. return d, nil
  233. }
  234. func initClient() {
  235. log.WithField("host", host).WithField("port", port).WithField("remote path", remotePath).Info("initializing client")
  236. err := os.MkdirAll(volumeRoot, 760)
  237. if err != nil {
  238. log.Error(err)
  239. }
  240. params := []string{"-o", "mfsmaster="+host, "-o", "mfsport="+port, "-o", "mfssubfolder="+remotePath}
  241. if mountOptions != "" {
  242. params = append(params, strings.Split(mountOptions, " ")...)
  243. }
  244. params = append(params, []string{volumeRoot}...)
  245. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(connectTimeout)*time.Millisecond)
  246. defer cancel()
  247. output, err := exec.CommandContext(ctx, "mfsmount", params...).CombinedOutput()
  248. if err != nil {
  249. log.Error(string(output))
  250. log.Fatal(err)
  251. }
  252. log.Debug(string(output))
  253. }
  254. func startReaperWorker() {
  255. // See related issue in go-reaper https://github.com/ramr/go-reaper/issues/11
  256. if _, hasReaper := os.LookupEnv("REAPER"); !hasReaper {
  257. go reaper.Reap()
  258. args := append(os.Args, "#worker")
  259. pwd, err := os.Getwd()
  260. if err != nil {
  261. panic(err)
  262. }
  263. workerEnv := []string{fmt.Sprintf("REAPER=%d", os.Getpid())}
  264. var wstatus syscall.WaitStatus
  265. pattrs := &syscall.ProcAttr{
  266. Dir: pwd,
  267. Env: append(os.Environ(), workerEnv...),
  268. Sys: &syscall.SysProcAttr{Setsid: true},
  269. Files: []uintptr{0, 1, 2},
  270. }
  271. workerPid, _ := syscall.ForkExec(args[0], args, pattrs)
  272. _, err = syscall.Wait4(workerPid, &wstatus, 0, nil)
  273. for syscall.EINTR == err {
  274. _, err = syscall.Wait4(workerPid, &wstatus, 0, nil)
  275. }
  276. }
  277. }
  278. func main() {
  279. logLevel, err := log.ParseLevel(os.Getenv("LOG_LEVEL"))
  280. if err != nil {
  281. log.SetLevel(log.InfoLevel)
  282. } else {
  283. log.SetLevel(logLevel)
  284. }
  285. log.Debugf("log level set to %s", log.GetLevel())
  286. startReaperWorker()
  287. connectTimeout, err = strconv.Atoi(connectTimeoutStr)
  288. if err != nil {
  289. log.Errorf("failed to parse timeout with error %v. Assuming default %v", err, connectTimeout)
  290. }
  291. initClient()
  292. d, err := newLizardfsDriver("/mnt")
  293. if err != nil {
  294. log.Fatal(err)
  295. }
  296. h := volume.NewHandler(d)
  297. log.Infof("listening on %s", socketAddress)
  298. log.Error(h.ServeUnix(socketAddress, 0))
  299. }