// Copyright 2018 The gVisor Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package utils

import (
	"fmt"
	"path/filepath"
	"strings"

	specs "github.com/opencontainers/runtime-spec/specs-go"
)

const volumeKeyPrefix = "dev.gvisor.spec.mount."

var kubeletPodsDir = "/var/lib/kubelet/pods"

// volumeName gets volume name from volume annotation key, example:
//	dev.gvisor.spec.mount.NAME.share
func volumeName(k string) string {
	return strings.SplitN(strings.TrimPrefix(k, volumeKeyPrefix), ".", 2)[0]
}

// volumeFieldName gets volume field name from volume annotation key, example:
//	`type` is the field of dev.gvisor.spec.mount.NAME.type
func volumeFieldName(k string) string {
	parts := strings.Split(strings.TrimPrefix(k, volumeKeyPrefix), ".")
	return parts[len(parts)-1]
}

// podUID gets pod UID from the pod log path.
func podUID(s *specs.Spec) (string, error) {
	sandboxLogDir := s.Annotations[sandboxLogDirAnnotation]
	if sandboxLogDir == "" {
		return "", fmt.Errorf("no sandbox log path annotation")
	}
	fields := strings.Split(filepath.Base(sandboxLogDir), "_")
	switch len(fields) {
	case 1: // This is the old CRI logging path.
		return fields[0], nil
	case 3: // This is the new CRI logging path.
		return fields[2], nil
	}
	return "", fmt.Errorf("unexpected sandbox log path %q", sandboxLogDir)
}

// isVolumeKey checks whether an annotation key is for volume.
func isVolumeKey(k string) bool {
	return strings.HasPrefix(k, volumeKeyPrefix)
}

// volumeSourceKey constructs the annotation key for volume source.
func volumeSourceKey(volume string) string {
	return volumeKeyPrefix + volume + ".source"
}

// volumePath searches the volume path in the kubelet pod directory.
func volumePath(volume, uid string) (string, error) {
	// TODO: Support subpath when gvisor supports pod volume bind mount.
	volumeSearchPath := fmt.Sprintf("%s/%s/volumes/*/%s", kubeletPodsDir, uid, volume)
	dirs, err := filepath.Glob(volumeSearchPath)
	if err != nil {
		return "", err
	}
	if len(dirs) != 1 {
		return "", fmt.Errorf("unexpected matched volume list %v", dirs)
	}
	return dirs[0], nil
}

// isVolumePath checks whether a string is the volume path.
func isVolumePath(volume, path string) (bool, error) {
	// TODO: Support subpath when gvisor supports pod volume bind mount.
	volumeSearchPath := fmt.Sprintf("%s/*/volumes/*/%s", kubeletPodsDir, volume)
	return filepath.Match(volumeSearchPath, path)
}

// UpdateVolumeAnnotations add necessary OCI annotations for gvisor
// volume optimization. Returns true if the spec was modified.
func UpdateVolumeAnnotations(s *specs.Spec) (bool, error) {
	var uid string
	if IsSandbox(s) {
		var err error
		uid, err = podUID(s)
		if err != nil {
			// Skip if we can't get pod UID, because this doesn't work
			// for containerd 1.1.
			return false, nil
		}
	}
	var updated bool
	for k, v := range s.Annotations {
		if !isVolumeKey(k) {
			continue
		}
		if volumeFieldName(k) != "type" {
			continue
		}
		volume := volumeName(k)
		if uid != "" {
			// This is a sandbox.
			path, err := volumePath(volume, uid)
			if err != nil {
				return false, fmt.Errorf("get volume path for %q: %w", volume, err)
			}
			s.Annotations[volumeSourceKey(volume)] = path
			updated = true
		} else {
			// This is a container.
			for i := range s.Mounts {
				// An error is returned for sandbox if source annotation is not
				// successfully applied, so it is guaranteed that the source annotation
				// for sandbox has already been successfully applied at this point.
				//
				// The volume name is unique inside a pod, so matching without podUID
				// is fine here.
				//
				// TODO: Pass podUID down to shim for containers to do more accurate
				// matching.
				if yes, _ := isVolumePath(volume, s.Mounts[i].Source); yes {
					// Container mount type must match the sandbox's mount type.
					changeMountType(&s.Mounts[i], v)
					updated = true
				}
			}
		}
	}
	return updated, nil
}

func changeMountType(m *specs.Mount, newType string) {
	m.Type = newType

	// OCI spec allows bind mounts to be specified in options only. So if new type
	// is not bind, remove bind/rbind from options.
	//
	// "For bind mounts (when options include either bind or rbind), the type is
	// a dummy, often "none" (not listed in /proc/filesystems)."
	if newType != "bind" {
		newOpts := make([]string, 0, len(m.Options))
		for _, opt := range m.Options {
			if opt != "rbind" && opt != "bind" {
				newOpts = append(newOpts, opt)
			}
		}
		m.Options = newOpts
	}
}