/*
*
* Copyright © 2021-2024 Dell Inc. or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package array provides structs and methods for configuring connection to PowerStore array.
package array
import (
"context"
"errors"
"fmt"
"net/http"
"path/filepath"
"regexp"
"strconv"
"strings"
"sync"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/dell/csi-powerstore/v2/core"
"github.com/dell/csi-powerstore/v2/pkg/common"
"github.com/dell/csi-powerstore/v2/pkg/common/fs"
csictx "github.com/dell/gocsi/context"
"github.com/dell/gopowerstore"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"gopkg.in/yaml.v3"
)
// IPToArray - Store Array IPs
var IPToArray map[string]string
// Consumer provides methods for safe management of arrays
type Consumer interface {
Arrays() map[string]*PowerStoreArray
SetArrays(map[string]*PowerStoreArray)
DefaultArray() *PowerStoreArray
SetDefaultArray(*PowerStoreArray)
UpdateArrays(string, fs.Interface) error
}
// Locker provides implementation for safe management of arrays
type Locker struct {
arraysLock sync.Mutex
defaultArrayLock sync.Mutex
arrays map[string]*PowerStoreArray
defaultArray *PowerStoreArray
}
// Arrays is a getter for list of arrays
func (s *Locker) Arrays() map[string]*PowerStoreArray {
s.arraysLock.Lock()
defer s.arraysLock.Unlock()
return s.arrays
}
// GetOneArray is a getter for an arrays based on globalID
func (s *Locker) GetOneArray(globalID string) (*PowerStoreArray, error) {
s.arraysLock.Lock()
defer s.arraysLock.Unlock()
if arrayConfig, ok := s.arrays[globalID]; ok {
return arrayConfig, nil
}
log.Errorf("array having globalID %s is not found in cache", globalID)
return nil, fmt.Errorf("array not found")
}
// SetArrays adds an array
func (s *Locker) SetArrays(arrays map[string]*PowerStoreArray) {
s.arraysLock.Lock()
defer s.arraysLock.Unlock()
s.arrays = arrays
}
// DefaultArray is a getter for default array
func (s *Locker) DefaultArray() *PowerStoreArray {
s.defaultArrayLock.Lock()
defer s.defaultArrayLock.Unlock()
return s.defaultArray
}
// SetDefaultArray sets default array
func (s *Locker) SetDefaultArray(array *PowerStoreArray) {
s.defaultArrayLock.Lock()
defer s.defaultArrayLock.Unlock()
s.defaultArray = array
}
// UpdateArrays updates array info
func (s *Locker) UpdateArrays(configPath string, fs fs.Interface) error {
log.Info("updating array info")
arrays, matcher, defaultArray, err := GetPowerStoreArrays(fs, configPath)
if err != nil {
return fmt.Errorf("can't get config for arrays: %s", err.Error())
}
s.SetArrays(arrays)
IPToArray = matcher
s.SetDefaultArray(defaultArray)
return nil
}
// PowerStoreArray is a struct that stores all PowerStore connection information.
// It stores gopowerstore client that can be directly used to invoke PowerStore API calls.
// This structure is supposed to be parsed from config and mainly is created by GetPowerStoreArrays function.
type PowerStoreArray struct {
Endpoint string `yaml:"endpoint"`
GlobalID string `yaml:"globalID"`
Username string `yaml:"username"`
Password string `yaml:"password"`
NasName string `yaml:"nasName"`
BlockProtocol common.TransportType `yaml:"blockProtocol"`
Insecure bool `yaml:"skipCertificateValidation"`
IsDefault bool `yaml:"isDefault"`
NfsAcls string `yaml:"nfsAcls"`
Client gopowerstore.Client
IP string
}
// GetNasName is a getter that returns name of configured NAS
func (psa *PowerStoreArray) GetNasName() string {
return psa.NasName
}
// GetClient is a getter that returns gopowerstore Client interface
func (psa *PowerStoreArray) GetClient() gopowerstore.Client {
return psa.Client
}
// GetIP is a getter that returns IP address of the array
func (psa *PowerStoreArray) GetIP() string {
return psa.IP
}
// GetGlobalID is a getter that returns GlobalID address of the array
func (psa *PowerStoreArray) GetGlobalID() string {
return psa.GlobalID
}
// GetPowerStoreArrays parses config.yaml file, initializes gopowerstore Clients and composes map of arrays for ease of access.
// It will return array that can be used as default as a second return parameter.
// If config does not have any array as a default then the first will be returned as a default.
func GetPowerStoreArrays(fs fs.Interface, filePath string) (map[string]*PowerStoreArray, map[string]string, *PowerStoreArray, error) {
type config struct {
Arrays []*PowerStoreArray `yaml:"arrays"`
}
data, err := fs.ReadFile(filepath.Clean(filePath))
if err != nil {
log.Errorf("cannot read file %s : %s", filePath, err.Error())
return nil, nil, nil, err
}
var cfg config
err = yaml.Unmarshal(data, &cfg)
if err != nil {
log.Errorf("cannot unmarshal data: %s", err.Error())
return nil, nil, nil, err
}
arrayMap := make(map[string]*PowerStoreArray)
mapper := make(map[string]string)
var defaultArray *PowerStoreArray
foundDefault := false
if len(cfg.Arrays) == 0 {
return arrayMap, mapper, defaultArray, nil
}
// Safeguard if user doesn't set any array as default, we just use first one
defaultArray = cfg.Arrays[0]
// Convert to map for convenience and init gopowerstore.Client
for _, array := range cfg.Arrays {
array := array
if array == nil {
return arrayMap, mapper, defaultArray, nil
}
if array.GlobalID == "" {
return nil, nil, nil, errors.New("no GlobalID field found in config.yaml - update config.yaml according to the documentation")
}
clientOptions := gopowerstore.NewClientOptions()
clientOptions.SetInsecure(array.Insecure)
if throttlingRateLimit, ok := csictx.LookupEnv(context.Background(), common.EnvThrottlingRateLimit); ok {
rateLimit, err := strconv.Atoi(throttlingRateLimit)
if err != nil {
log.Errorf("can't get throttling rate limit, using default")
} else if rateLimit < 0 {
log.Errorf("throttling rate limit is negative, using default")
} else {
clientOptions.SetRateLimit(rateLimit)
}
}
c, err := gopowerstore.NewClientWithArgs(
array.Endpoint, array.Username, array.Password, clientOptions)
if err != nil {
return nil, nil, nil, status.Errorf(codes.FailedPrecondition,
"unable to create PowerStore client: %s", err.Error())
}
c.SetCustomHTTPHeaders(http.Header{
"Application-Type": {fmt.Sprintf("%s/%s", common.VerboseName, core.SemVer)},
})
c.SetLogger(&common.CustomLogger{})
array.Client = c
if array.BlockProtocol == "" {
array.BlockProtocol = common.AutoDetectTransport
}
array.BlockProtocol = common.TransportType(strings.ToUpper(string(array.BlockProtocol)))
var ip string
ips := common.GetIPListFromString(array.Endpoint)
if ips == nil {
log.Warnf("didn't found an IP from the provided endPoint, it could be a FQDN. Please make sure to enter a valid FQDN in https://abc.com/api/rest format")
sub := strings.Split(array.Endpoint, "/")
if len(sub) > 2 {
ip = sub[2]
if regexp.MustCompile(`^[0-9.]*$`).MatchString(sub[2]) {
return nil, nil, nil, fmt.Errorf("can't get ips from endpoint: %s", array.Endpoint)
}
} else {
return nil, nil, nil, fmt.Errorf("can't get ips from endpoint: %s", array.Endpoint)
}
} else {
ip = ips[0]
}
array.IP = ip
log.Infof("%s,%s,%s,%s,%t,%t,%s,%s", array.Endpoint, array.GlobalID, array.Username, array.NasName, array.Insecure, array.IsDefault, array.BlockProtocol, ip)
arrayMap[array.GlobalID] = array
mapper[ip] = array.GlobalID
if array.IsDefault && !foundDefault {
defaultArray = array
foundDefault = true
}
}
return arrayMap, mapper, defaultArray, nil
}
// ParseVolumeID parses a volume id from the CO (Kubernetes) and tries to extract the PowerStore volume id, Global ID, and protocol.
//
// Example:
//
// ParseVolumeID("1cd254s/192.168.0.1/scsi") assuming 192.168.0.1 is the IP array PSabc0123def will return
// localVolumeID = "1cd254s"
// arrayID = "PSabc0123def"
// protocol = "scsi"
// e = nil
//
// Example:
//
// ParseVolumeID("9f840c56-96e6-4de9-b5a3-27e7c20eaa77/PSabcdef0123/scsi:9f840c56-96e6-4de9-b5a3-27e7c20eaa77/PS0123abcdef") returns
// localVolumeID = "9f840c56-96e6-4de9-b5a3-27e7c20eaa77"
// arrayID = "PSabcdef0123"
// protocol = "scsi"
// remoteVolumeID = "9f840c56-96e6-4de9-b5a3-27e7c20eaa77"
// remoteArrayID = "PS0123abcdef"
// e = nil
//
// This function is backwards compatible and will try to understand volume protocol even if there is no such information in volume id.
// It will do that by querying default powerstore array passed as one of the arguments
func ParseVolumeID(ctx context.Context, volumeHandle string,
defaultArray *PowerStoreArray, /*legacy support*/
cap *csi.VolumeCapability, /*legacy support*/
) (localVolumeID, arrayID, protocol, remoteVolumeID, remoteArrayID string, e error) {
if volumeHandle == "" {
return "", "", "", "", "", status.Errorf(codes.FailedPrecondition,
"unable to parse volume handle. volumeHandle is empty")
}
// metro volume handles will have a colon separating the local
// volume handle and remote volume handle
// e.g. 9f840c56-96e6-4de9-b5a3-27e7c20eaa77/PSabcdef0123/scsi:9f840c56-96e6-4de9-b5a3-27e7c20eaa77/PS0123abcdef
volumeHandles := strings.Split(volumeHandle, ":")
// parse the first (potentially only) volume handle
localVolumeHandle := strings.Split(volumeHandles[0], "/")
localVolumeID = localVolumeHandle[0]
log.Debugf("vol-id %s", localVolumeHandle)
if len(localVolumeHandle) == 1 {
// Legacy support where the volume name consists of only the volume ID.
// We've got a volume from previous version
// We assume that we should use default array for that
// Try to understand whether it is an nfs or scsi based volume
arrayID = defaultArray.GetGlobalID()
// If we have volume capability in request we can check FsType
if cap != nil && cap.GetMount() != nil {
if cap.GetMount().GetFsType() == "nfs" {
protocol = "nfs"
} else {
protocol = "scsi"
}
} else {
// Try to just find out volume type by querying it's id from array
_, err := defaultArray.GetClient().GetVolume(ctx, localVolumeID)
if err == nil {
protocol = "scsi"
} else {
_, err := defaultArray.GetClient().GetFS(ctx, localVolumeID)
if err == nil {
protocol = "nfs"
} else {
if apiError, ok := err.(gopowerstore.APIError); ok && apiError.NotFound() {
return localVolumeID, arrayID, protocol, "", "", apiError
}
return localVolumeID, arrayID, protocol, "", "", status.Errorf(codes.Unknown,
"failure checking volume status: %s", err.Error())
}
}
}
} else {
if ips := common.GetIPListFromString(localVolumeHandle[1]); ips != nil {
// Legacy support where IP is used in the volume name in place of a PowerStore Global ID.
arrayID = IPToArray[ips[0]]
} else {
arrayID = localVolumeHandle[1]
}
protocol = localVolumeHandle[2]
}
// Parse the second portion of a metro volume handle
if len(volumeHandles) > 1 {
remoteVolumeHandle := strings.Split(volumeHandles[1], "/")
remoteVolumeID = remoteVolumeHandle[0]
remoteArrayID = remoteVolumeHandle[1]
}
log.Debugf("id %s arrayID %s proto %s", localVolumeID, arrayID, protocol)
return localVolumeID, arrayID, protocol, remoteVolumeID, remoteArrayID, nil
}
/*
*
* Copyright © 2021-2023 Dell Inc. or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package common provides common constants, variables and function used in both controller and node services.
package common
import (
"bytes"
"context"
"crypto/rand"
"errors"
"fmt"
"net"
"os"
"regexp"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/apparentlymart/go-cidr/cidr"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/dell/csi-powerstore/v2/core"
"github.com/dell/csi-powerstore/v2/pkg/common/fs"
"github.com/dell/gobrick"
csictx "github.com/dell/gocsi/context"
"github.com/dell/gocsi/utils"
"github.com/dell/gopowerstore"
log "github.com/sirupsen/logrus"
)
// Name contains default name of the driver, can be overridden
var Name = "csi-powerstore.dellemc.com"
// APIPort port for API calls
var APIPort string
// Manifest contains additional information about the driver
var Manifest = map[string]string{
"url": "https://github.com/dell/csi-powerstore",
"semver": core.SemVer,
"commit": core.CommitSha32,
"formed": core.CommitTime.Format(time.RFC1123),
}
type key int
// ArrayConnectivityStatus Status of the array probe
type ArrayConnectivityStatus struct {
LastSuccess int64 `json:"lastSuccess"` // connectivity status
LastAttempt int64 `json:"lastAttempt"` // last timestamp attempted to check connectivity
}
const (
// KeyAllowRoot key value to check if driver should enable root squashing for nfs volumes
KeyAllowRoot = "allowRoot"
// KeyNfsExportPath key value to pass in publish context
KeyNfsExportPath = "NfsExportPath"
// KeyHostIP key value to pass in publish context
KeyHostIP = "HostIP"
// KeyExportID key value to pass in publish context
KeyExportID = "ExportID"
// KeyNatIP key value to pass in publish context
KeyNatIP = "NatIP"
// KeyArrayID key value to check in request parameters for array ip
KeyArrayID = "arrayID"
// KeyArrayVolumeName key value to check in request parameters for volume name
KeyArrayVolumeName = "Name"
// KeyProtocol key value to check in request parameters for volume name
KeyProtocol = "Protocol"
// KeyNfsACL key value to specify NFS ACLs for NFS volume
KeyNfsACL = "nfsAcls"
// KeyNasName key value to specify NAS server name
KeyNasName = "nasName"
// KeyVolumeDescription key value to specify volume description
KeyVolumeDescription = "csi.dell.com/description"
// KeyApplianceID key value to specify appliance_id
KeyApplianceID = "csi.dell.com/appliance_id"
// KeyProtectionPolicyID key value to specify protection_policy_id
KeyProtectionPolicyID = "csi.dell.com/protection_policy_id"
// KeyPerformancePolicyID key value to specify performance_policy_id
KeyPerformancePolicyID = "csi.dell.com/performance_policy_id"
// KeyAppType key value to specify app_type
KeyAppType = "csi.dell.com/app_type"
// KeyAppTypeOther key value to specify app_type_other
KeyAppTypeOther = "csi.dell.com/app_type_other"
// KeyConfigType key value to specify volume config_type
KeyConfigType = "csi.dell.com/config_type"
// KeyAccessPolicy key value to specify volume access_policy
KeyAccessPolicy = "csi.dell.com/access_policy"
// KeyLockingPolicy key value to specify volume locking_policy
KeyLockingPolicy = "csi.dell.com/locking_policy"
// KeyFolderRenamePolicy key value to specify volume folder_rename_policy
KeyFolderRenamePolicy = "csi.dell.com/folder_rename_policy"
// KeyIsAsyncMtimeEnabled key value to specify volume is_async_mtime_enabled
KeyIsAsyncMtimeEnabled = "csi.dell.com/is_async_mtime_enabled"
// KeyFileEventsPublishingMode key value to specify volume file_events_publishing_mode
KeyFileEventsPublishingMode = "csi.dell.com/file_events_publishing_mode"
// KeyHostIoSize key value to specify volume host_io_size
KeyHostIoSize = "csi.dell.com/host_io_size"
// KeyVolumeGroupID key value to specify volume_group_id
KeyVolumeGroupID = "csi.dell.com/volume_group_id"
// KeyFlrCreateMode key value to specify flr_attributes.flr_create.mode
KeyFlrCreateMode = "csi.dell.com/flr_attributes.flr_create.mode"
// KeyFlrDefaultRetention key value to specify flr_attributes.flr_create.default_retention
KeyFlrDefaultRetention = "csi.dell.com/flr_attributes.flr_create.default_retention"
// KeyFlrMinRetention key value to specify flr_attributes.flr_create.minimum_retention
KeyFlrMinRetention = "csi.dell.com/flr_attributes.flr_create.minimum_retention"
// KeyFlrMaxRetention key value to specify flr_attributes.flr_create.maximum_retention
KeyFlrMaxRetention = "csi.dell.com/flr_attributes.flr_create.maximum_retention"
// KeyServiceTag has the service tag associated to an Appliance
KeyServiceTag = "serviceTag"
// VerboseName longer description of the driver
VerboseName = "CSI Driver for Dell EMC PowerStore"
// FcTransport indicates that FC is chosen as a SCSI transport protocol
FcTransport TransportType = "FC"
// ISCSITransport indicates that ISCSI is chosen as a SCSI transport protocol
ISCSITransport TransportType = "ISCSI"
// AutoDetectTransport indicates that SCSI transport protocol would be detected automatically
AutoDetectTransport TransportType = "AUTO"
// NoneTransport indicates that no SCSI transport protocol needed
NoneTransport TransportType = "NONE"
// PublishContextDeviceWWN indicates publish context device wwn
PublishContextDeviceWWN = "DEVICE_WWN"
// PublishContextLUNAddress indicates publish context LUN address
PublishContextLUNAddress = "LUN_ADDRESS"
// PublishContextISCSIPortalsPrefix indicates publish context iSCSI portals prefix
PublishContextISCSIPortalsPrefix = "PORTAL"
// PublishContextISCSITargetsPrefix indicates publish context iSCSI targets prefix
PublishContextISCSITargetsPrefix = "TARGET"
// PublishContextNVMETCPPortalsPrefix indicates publish context NVMeTCP portals prefix
PublishContextNVMETCPPortalsPrefix = "NVMETCPPORTAL"
// PublishContextNVMETCPTargetsPrefix indicates publish context NVMe targets prefix
PublishContextNVMETCPTargetsPrefix = "NVMETCPTARGET"
// PublishContextNVMEFCPortalsPrefix indicates publish context NVMe targets prefix
PublishContextNVMEFCPortalsPrefix = "NVMEFCPORTAL"
// PublishContextNVMEFCTargetsPrefix indicates publish context NVMe targets prefix
PublishContextNVMEFCTargetsPrefix = "NVMEFCTARGET"
// NVMETCPTransport indicates that NVMe/TCP is chosen as the transport protocol
NVMETCPTransport TransportType = "NVMETCP"
// NVMEFCTransport indicates that NVMe/FC is chosen as the transport protocol
NVMEFCTransport TransportType = "NVMEFC"
// PublishContextFCWWPNPrefix indicates publish context FC WWPN prefix
PublishContextFCWWPNPrefix = "FCWWPN"
// WWNPrefix indicates WWN prefix
WWNPrefix = "naa."
contextLogFieldsKey key = iota
// DefaultPodmonAPIPortNumber is the port number in default to expose internal health APIs
DefaultPodmonAPIPortNumber = "8083"
// DefaultPodmonPollRate is the default polling frequency to check for array connectivity
DefaultPodmonPollRate = 60
// Timeout for making http requests
Timeout = time.Second * 5
// ArrayStatus is the endPoint for polling to check array status
ArrayStatus = "/array-status"
)
// TransportType differentiates different SCSI transport protocols (FC, iSCSI, Auto, None)
type TransportType string
// RmSockFile removes socket files that left after previous installation
func RmSockFile(f fs.Interface) {
proto, addr, err := utils.GetCSIEndpoint()
if err != nil {
log.Errorf("Error: failed to get CSI endpoint: %s\n", err.Error())
}
var rmSockFileOnce sync.Once
rmSockFileOnce.Do(func() {
if proto == "unix" {
if _, err := f.Stat(addr); err == nil {
if err = f.RemoveAll(addr); err != nil {
log.Errorf("Error: failed to remove socket file %s: %s\n", addr, err.Error())
}
log.Infof("removed socket file %s\n", addr)
} else if os.IsNotExist(err) {
return
} else {
log.Errorf("Error: socket file %s may or may not exist: %s\n", addr, err.Error())
}
}
})
}
// GetIPListFromString returns list of ips in string form found in input string
// A return value of nil indicates no match
func GetIPListFromString(input string) []string {
re := regexp.MustCompile(`(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)(\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)){3}`)
return re.FindAllString(input, -1)
}
func parseMask(ipaddr string) (mask string, err error) {
removeExtra := regexp.MustCompile("^(.*[\\/])")
asd := ipaddr[len(ipaddr)-3:]
findSubnet := removeExtra.ReplaceAll([]byte(asd), []byte(""))
subnet, err := strconv.ParseInt(string(findSubnet), 10, 64)
if err != nil {
return "", errors.New("Parse Mask: Error parsing mask")
}
if subnet < 0 || subnet > 32 {
return "", errors.New("Invalid subnet mask")
}
var buff bytes.Buffer
for i := 0; i < int(subnet); i++ {
buff.WriteString("1")
}
for i := subnet; i < 32; i++ {
buff.WriteString("0")
}
masker := buff.String()
a, _ := strconv.ParseUint(masker[:8], 2, 64)
b, _ := strconv.ParseUint(masker[8:16], 2, 64)
c, _ := strconv.ParseUint(masker[16:24], 2, 64)
d, _ := strconv.ParseUint(masker[24:32], 2, 64)
resultMask := fmt.Sprintf("%v.%v.%v.%v", a, b, c, d)
return resultMask, nil
}
// GetIPListWithMaskFromString returns ip and mask in string form found in input string
// A return value of nil indicates no match
func GetIPListWithMaskFromString(input string) (string, error) {
// Split the IP address and subnet mask if present
parts := strings.Split(input, "/")
ip := parts[0]
result := net.ParseIP(ip)
if result == nil {
return "", errors.New("doesn't seem to be a valid IP")
}
if len(parts) > 1 {
// ideally there will be only 2 substrings for a valid IP/SubnetMask
if len(parts) > 2 {
return "", errors.New("doesn't seem to be a valid IP")
}
mask, err := parseMask(input)
if err != nil {
return "", errors.New("doesn't seem to be a valid IP")
}
ip = ip + "/" + mask
}
return ip, nil
}
// SetLogFields returns modified context with fields inserted as values by using contextLogFieldsKey key
func SetLogFields(ctx context.Context, fields log.Fields) context.Context {
if ctx == nil {
ctx = context.Background()
}
return context.WithValue(ctx, contextLogFieldsKey, fields)
}
// RandomString returns a random string of specified length.
// String is generated by using crypto/rand.
func RandomString(len int) string {
b := make([]byte, len)
_, err := rand.Read(b)
if err != nil {
log.Errorf("Can't generate random string; error = %v", err)
}
suff := fmt.Sprintf("%x", b[0:])
return suff
}
// GetLogFields extracts log fields from context by using contextLogFieldsKey key
func GetLogFields(ctx context.Context) log.Fields {
if ctx == nil {
return log.Fields{}
}
fields, ok := ctx.Value(contextLogFieldsKey).(log.Fields)
if !ok {
fields = log.Fields{}
}
csiReqID, ok := ctx.Value(csictx.RequestIDKey).(string)
if !ok {
return fields
}
fields["RequestID"] = csiReqID
return fields
}
// GetISCSITargetsInfoFromStorage returns list of gobrick compatible iscsi targets by querying PowerStore array
func GetISCSITargetsInfoFromStorage(client gopowerstore.Client, volumeApplianceID string) ([]gobrick.ISCSITargetInfo, error) {
addrInfo, err := client.GetStorageISCSITargetAddresses(context.Background())
if err != nil {
log.Error(err.Error())
return []gobrick.ISCSITargetInfo{}, err
}
// sort data by id
sort.Slice(addrInfo, func(i, j int) bool {
return addrInfo[i].ID < addrInfo[j].ID
})
var result []gobrick.ISCSITargetInfo
for _, t := range addrInfo {
// volumeApplianceID will be empty in case the call is from NodeGetInfo
if t.ApplianceID == volumeApplianceID || volumeApplianceID == "" {
result = append(result, gobrick.ISCSITargetInfo{Target: t.IPPort.TargetIqn, Portal: fmt.Sprintf("%s:3260", t.Address)})
}
}
return result, nil
}
// GetNVMETCPTargetsInfoFromStorage returns list of gobrick compatible NVME TCP targets by querying PowerStore array
func GetNVMETCPTargetsInfoFromStorage(client gopowerstore.Client, volumeApplianceID string) ([]gobrick.NVMeTargetInfo, error) {
clusterInfo, err := client.GetCluster(context.Background())
nvmeNQN := clusterInfo.NVMeNQN
addrInfo, err := client.GetStorageNVMETCPTargetAddresses(context.Background())
if err != nil {
log.Error(err.Error())
return []gobrick.NVMeTargetInfo{}, err
}
// sort data by id
sort.Slice(addrInfo, func(i, j int) bool {
return addrInfo[i].ID < addrInfo[j].ID
})
var result []gobrick.NVMeTargetInfo
for _, t := range addrInfo {
// volumeApplianceID will be empty in case the call is from NodeGetInfo
if t.ApplianceID == volumeApplianceID || volumeApplianceID == "" {
result = append(result, gobrick.NVMeTargetInfo{Target: nvmeNQN, Portal: fmt.Sprintf("%s:4420", t.Address)})
}
}
return result, nil
}
// GetFCTargetsInfoFromStorage returns list of gobrick compatible FC targets by querying PowerStore array
func GetFCTargetsInfoFromStorage(client gopowerstore.Client, volumeApplianceID string) ([]gobrick.FCTargetInfo, error) {
fcPorts, err := client.GetFCPorts(context.Background())
if err != nil {
log.Error(err.Error())
return nil, err
}
var result []gobrick.FCTargetInfo
for _, t := range fcPorts {
if t.IsLinkUp && t.ApplianceID == volumeApplianceID {
result = append(result, gobrick.FCTargetInfo{WWPN: strings.Replace(t.Wwn, ":", "", -1)})
}
}
return result, nil
}
// IsK8sMetadataSupported returns info whether Metadata is supported or not
func IsK8sMetadataSupported(client gopowerstore.Client) bool {
k8sMetadataSupported := false
majorMinorVersion, err := client.GetSoftwareMajorMinorVersion(context.Background())
if err != nil {
log.Errorf("couldn't get the software version installed on the PowerStore array: %v", err)
return k8sMetadataSupported
}
if majorMinorVersion >= 3.0 {
k8sMetadataSupported = true
} else {
log.Debugf("Software version installed on the PowerStore array: %v\n", majorMinorVersion)
}
return k8sMetadataSupported
}
// GetNVMEFCTargetInfoFromStorage returns a list of gobrick compatible NVMeFC targets by quering Powerstore Array
func GetNVMEFCTargetInfoFromStorage(client gopowerstore.Client, volumeApplianceID string) ([]gobrick.NVMeTargetInfo, error) {
clusterInfo, err := client.GetCluster(context.Background())
nvmeNQN := clusterInfo.NVMeNQN
fcPorts, err := client.GetFCPorts(context.Background())
if err != nil {
log.Error(err.Error())
return nil, err
}
var result []gobrick.NVMeTargetInfo
for _, t := range fcPorts {
if t.IsLinkUp && (t.ApplianceID == volumeApplianceID || volumeApplianceID == "") {
targetAddress := strings.Replace(fmt.Sprintf("nn-0x%s:pn-0x%s", strings.Replace(t.WwnNode, ":", "", -1), strings.Replace(t.WwnNVMe, ":", "", -1)), "\n", "", -1)
result = append(result, gobrick.NVMeTargetInfo{Target: nvmeNQN, Portal: targetAddress})
}
}
return result, nil
}
// ParseCIDR parses the CIDR address to the valid start IP range with Mask
func ParseCIDR(externalAccessCIDR string) (string, error) {
// check if externalAccess has netmask bit or not
if !strings.Contains(externalAccessCIDR, "/") {
// if externalAccess is a plane ip we can add /32 from our end
externalAccessCIDR += "/32"
log.Debug("externalAccess after appending netMask bit:", externalAccessCIDR)
}
ip, ipnet, err := net.ParseCIDR(externalAccessCIDR)
if err != nil {
return "", err
}
log.Debug("Parsed CIDR:", externalAccessCIDR, "-> ip:", ip, " net:", ipnet)
start, _ := cidr.AddressRange(ipnet)
fromString, err := GetIPListWithMaskFromString(externalAccessCIDR)
if err != nil {
return "", err
}
log.Debug("IP with Mask:", fromString)
s := strings.Split(fromString, "/")
// ExernalAccess IP consists of Starting range IP of CIDR+Mask and hence concatenating the same to remove from the array
externalAccess := start.String() + "/" + s[1]
return externalAccess, nil
}
// HasRequiredTopology Checks if requiredTopology is present in the topology array and is true
func HasRequiredTopology(topologies []*csi.Topology, arrIP string, requiredTopology string) bool {
if len(topologies) == 0 || len(arrIP) == 0 || len(requiredTopology) == 0 {
return false
}
topologyKey := Name + "/" + arrIP + "-" + strings.ToLower(requiredTopology)
for _, topology := range topologies {
if value, ok := topology.Segments[topologyKey]; ok && strings.EqualFold(value, "true") {
return true
}
}
return false
}
// GetNfsTopology Returns a topology array with only nfs
func GetNfsTopology(arrIP string) []*csi.Topology {
nfsTopology := new(csi.Topology)
nfsTopology.Segments = map[string]string{Name + "/" + arrIP + "-nfs": "true"}
return []*csi.Topology{nfsTopology}
}
// Contains return true if element is present in the slice
func Contains(slice []string, element string) bool {
for _, a := range slice {
if a == element {
return true
}
}
return false
}
// ExternalAccessAlreadyAdded return true if externalAccess is present on ARRAY in any access mode type
func ExternalAccessAlreadyAdded(export gopowerstore.NFSExport, externalAccess string) bool {
externalAccess, _ = ParseCIDR(externalAccess)
if Contains(export.RWRootHosts, externalAccess) || Contains(export.RWHosts, externalAccess) || Contains(export.RORootHosts, externalAccess) || Contains(export.ROHosts, externalAccess) {
log.Debug("ExternalAccess is already added into Host Access list on array: ", externalAccess)
return true
}
log.Debug("Going to add externalAccess into Host Access list on array: ", externalAccess)
return false
}
// SetPollingFrequency reads the pollingFrequency from Env, sets default vale if ENV not found
func SetPollingFrequency(ctx context.Context) int64 {
var pollingFrequency int64
if pollRateEnv, ok := csictx.LookupEnv(ctx, EnvPodmonArrayConnectivityPollRate); ok {
if pollingFrequency, _ = strconv.ParseInt(pollRateEnv, 10, 32); pollingFrequency != 0 {
log.Debugf("use pollingFrequency as %d seconds", pollingFrequency)
return pollingFrequency
}
}
log.Debugf("use default pollingFrequency as %d seconds", DefaultPodmonPollRate)
return DefaultPodmonPollRate
}
// SetAPIPort set the port for running server
func SetAPIPort(ctx context.Context) {
if port, ok := csictx.LookupEnv(ctx, EnvPodmonAPIPORT); ok && strings.TrimSpace(port) != "" {
APIPort = fmt.Sprintf(":%s", port)
log.Debugf("set podmon API port to %s", APIPort)
return
}
// If the port number cannot be fetched, set it to default
APIPort = ":" + DefaultPodmonAPIPortNumber
log.Debugf("set podmon API port to default %s", APIPort)
}
// ReachableEndPoint checks if this endpoint is reachable or not
func ReachableEndPoint(endpoint string) bool {
// this endpoint has IP:PORT
_, err := net.DialTimeout("tcp", endpoint, 2*time.Second)
if err != nil {
return false
}
return true
}
/*
*
* Copyright © 2021-2023 Dell Inc. or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package fs provides wrappers for os/fs dependent operations.
package fs
import (
"context"
"errors"
"fmt"
"io"
"net"
"os"
"os/exec"
"path/filepath"
"syscall"
"time"
"github.com/dell/gofsutil"
log "github.com/sirupsen/logrus"
)
// A FileInfo describes a file and is returned by Stat and Lstat.
type FileInfo interface {
Name() string // base name of the file
Size() int64 // length in bytes for regular files; system-dependent for others
Mode() os.FileMode // file mode bits
ModTime() time.Time // modification time
IsDir() bool // abbreviation for Mode().IsDir()
Sys() interface{} // underlying data source (can return nil)
}
// Interface wraps usual os and fs related calls so they can be mocked.
// Also Interface provides access to the gofsutil wrapper UtilInterface with GetUtil() method.
type Interface interface {
OpenFile(name string, flag int, perm os.FileMode) (*os.File, error)
Stat(name string) (FileInfo, error)
Create(name string) (*os.File, error)
ReadFile(name string) ([]byte, error)
WriteFile(filename string, data []byte, perm os.FileMode) error
IsNotExist(err error) bool
IsDeviceOrResourceBusy(err error) bool
Mkdir(name string, perm os.FileMode) error
MkdirAll(name string, perm os.FileMode) error
Chmod(name string, perm os.FileMode) error
Remove(name string) error
RemoveAll(name string) error
WriteString(file *os.File, str string) (int, error)
ExecCommand(name string, args ...string) ([]byte, error)
ExecCommandOutput(name string, args ...string) ([]byte, error)
GetUtil() UtilInterface
// wrapper
ParseProcMounts(ctx context.Context, content io.Reader) ([]gofsutil.Info, error)
MkFileIdempotent(path string) (bool, error)
// Network
NetDial(endpoint string) (net.Conn, error)
}
// UtilInterface is a wrapper of gofsutil.fs functions so they can be mocked
type UtilInterface interface {
GetDiskFormat(ctx context.Context, disk string) (string, error)
Format(ctx context.Context, source, target, fsType string, options ...string) error
FormatAndMount(ctx context.Context, source, target, fsType string, options ...string) error
Mount(ctx context.Context, source, target, fsType string, options ...string) error
BindMount(ctx context.Context, source, target string, options ...string) error
Unmount(ctx context.Context, target string) error
GetMounts(ctx context.Context) ([]gofsutil.Info, error)
GetDevMounts(ctx context.Context, dev string) ([]gofsutil.Info, error)
ValidateDevice(ctx context.Context, source string) (string, error)
WWNToDevicePath(ctx context.Context, wwn string) (string, string, error)
RescanSCSIHost(ctx context.Context, targets []string, lun string) error
RemoveBlockDevice(ctx context.Context, blockDevicePath string) error
TargetIPLUNToDevicePath(ctx context.Context, targetIP string, lunID int) (map[string]string, error)
MultipathCommand(ctx context.Context, timeout time.Duration, chroot string, arguments ...string) ([]byte, error)
GetFCHostPortWWNs(ctx context.Context) ([]string, error)
IssueLIPToAllFCHosts(ctx context.Context) error
GetSysBlockDevicesForVolumeWWN(ctx context.Context, volumeWWN string) ([]string, error)
DeviceRescan(ctx context.Context, devicePath string) error
ResizeFS(ctx context.Context, volumePath, devicePath, ppathDevice, mpathDevice, fsType string) error
GetMountInfoFromDevice(ctx context.Context, devID string) (*gofsutil.DeviceMountInfo, error)
ResizeMultipath(ctx context.Context, deviceName string) error
FindFSType(ctx context.Context, mountpoint string) (fsType string, err error)
GetMpathNameFromDevice(ctx context.Context, device string) (string, error)
}
// Fs implementation of FsInterface that uses default os/file calls
type Fs struct {
Util *gofsutil.FS
}
// GetUtil returns gofsutil.fs wrapper -- UtilInterface.
func (fs *Fs) GetUtil() UtilInterface {
return fs.Util // #nosec G304
}
// OpenFile is a wrapper of os.OpenFile
func (fs *Fs) OpenFile(name string, flag int, perm os.FileMode) (*os.File, error) {
return os.OpenFile(name, flag, perm) // #nosec G304
}
// WriteString is a wrapper of file.WriteString
func (fs *Fs) WriteString(file *os.File, string string) (int, error) {
return file.WriteString(string) // #nosec G304
}
// Create is a wrapper of os.Create
func (fs *Fs) Create(name string) (*os.File, error) {
return os.Create(name) // #nosec G304
}
// Chmod is a wrapper of os.Chmod
func (fs *Fs) Chmod(name string, perm os.FileMode) error {
return os.Chmod(name, perm)
}
// ReadFile is a wrapper of os.ReadFile
func (fs *Fs) ReadFile(name string) ([]byte, error) {
return os.ReadFile(filepath.Clean(name))
}
// WriteFile is a wrapper of os.WriteFile
func (fs *Fs) WriteFile(filename string, data []byte, perm os.FileMode) error {
return os.WriteFile(filepath.Clean(filename), data, perm)
}
// Stat is a wrapper of os.Stat
func (fs *Fs) Stat(name string) (FileInfo, error) {
return os.Stat(name)
}
// IsNotExist is a wrapper of os.IsNotExist
func (fs *Fs) IsNotExist(err error) bool {
return os.IsNotExist(err)
}
// IsDeviceOrResourceBusy checks for device or resource busy error
func (fs *Fs) IsDeviceOrResourceBusy(err error) bool {
return errors.Unwrap(err) == syscall.EBUSY
}
// Mkdir is a wrapper of os.Mkdir
func (fs *Fs) Mkdir(name string, perm os.FileMode) error {
return os.Mkdir(name, perm)
}
// MkdirAll is a wrapper of os.MkdirAll
func (fs *Fs) MkdirAll(name string, perm os.FileMode) error {
return os.MkdirAll(name, perm)
}
// Remove is a wrapper of os.Remove
func (fs *Fs) Remove(name string) error {
return os.Remove(name)
}
// RemoveAll is a wrapper of os.RemoveAll
func (fs *Fs) RemoveAll(name string) error {
return os.RemoveAll(name)
}
// ExecCommand is a wrapper of exec.Command that returns CombinedOutput
func (fs *Fs) ExecCommand(name string, args ...string) ([]byte, error) {
return exec.Command(name, args...).CombinedOutput() // #nosec G204
}
// ExecCommandOutput is a wrapper of exec.Command that returns default Output
func (fs *Fs) ExecCommandOutput(name string, args ...string) ([]byte, error) {
return exec.Command(name, args...).Output() // #nosec G204
}
// ParseProcMounts is wrapper of gofsutil.ReadProcMountsFrom global function
func (fs *Fs) ParseProcMounts(
ctx context.Context,
content io.Reader,
) ([]gofsutil.Info, error) {
r, _, err := gofsutil.ReadProcMountsFrom(ctx, content, false,
gofsutil.ProcMountsFields, gofsutil.DefaultEntryScanFunc())
return r, err
}
// NetDial is a wrapper for net.Dial func. Uses UDP and 80 port.
func (fs *Fs) NetDial(endpoint string) (net.Conn, error) {
return net.Dial("udp", fmt.Sprintf("%s:80", endpoint))
}
// MkFileIdempotent creates file if there is none
func (fs *Fs) MkFileIdempotent(path string) (bool, error) {
st, err := fs.Stat(path)
if fs.IsNotExist(err) {
file, err := fs.OpenFile(path, os.O_CREATE, 0o600)
if err != nil {
log.WithField("path", path).WithError(err).Error("Unable to create file")
return false, err
}
if err = file.Close(); err != nil {
return false, fmt.Errorf("could not close file")
}
log.WithField("path", path).Debug("created file")
return true, nil
}
if st.IsDir() {
return false, fmt.Errorf("existing path is a directory")
}
return false, nil
}
/*
Copyright (c) 2023 Dell Inc, or its subsidiaries.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package k8sutils
import (
"context"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
// NodeLabelsRetrieverInterface defines the methods for retrieving Kubernetes Node Labels
type NodeLabelsRetrieverInterface interface {
BuildConfigFromFlags(masterURL, kubeconfig string) (*rest.Config, error)
InClusterConfig() (*rest.Config, error)
NewForConfig(config *rest.Config) (*kubernetes.Clientset, error)
GetNodeLabels(ctx context.Context, k8sclientset *kubernetes.Clientset, kubeNodeName string) (map[string]string, error)
}
// NodeLabelsRetrieverImpl provided the implementation for NodeLabelsRetrieverInterface
type NodeLabelsRetrieverImpl struct{}
// NodeLabelsRetriever is the actual instance of NodeLabelsRetrieverInterface which is used to retrieve the node labels
var NodeLabelsRetriever NodeLabelsRetrieverInterface
func init() {
NodeLabelsRetriever = new(NodeLabelsRetrieverImpl)
}
// BuildConfigFromFlags is a method for building kubernetes client config
func (svc *NodeLabelsRetrieverImpl) BuildConfigFromFlags(masterURL, kubeconfig string) (*rest.Config, error) {
return clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
}
// InClusterConfig returns a config object which uses the service account kubernetes gives to pods
func (svc *NodeLabelsRetrieverImpl) InClusterConfig() (*rest.Config, error) {
return rest.InClusterConfig()
}
// NewForConfig creates a new Clientset for the given config
func (svc *NodeLabelsRetrieverImpl) NewForConfig(config *rest.Config) (*kubernetes.Clientset, error) {
return kubernetes.NewForConfig(config)
}
// GetNodeLabels retrieves the kubernetes node object and returns its labels
func (svc *NodeLabelsRetrieverImpl) GetNodeLabels(ctx context.Context, k8sclientset *kubernetes.Clientset, kubeNodeName string) (map[string]string, error) {
if k8sclientset != nil {
node, err := k8sclientset.CoreV1().Nodes().Get(ctx, kubeNodeName, v1.GetOptions{})
if err != nil {
return nil, err
}
return node.Labels, nil
}
return nil, nil
}
// CreateKubeClientSet creates and returns kubeclient set
func CreateKubeClientSet(kubeconfig string) (*kubernetes.Clientset, error) {
var clientset *kubernetes.Clientset
if kubeconfig != "" {
config, err := NodeLabelsRetriever.BuildConfigFromFlags("", kubeconfig)
if err != nil {
return nil, err
}
// create the clientset
clientset, err = NodeLabelsRetriever.NewForConfig(config)
if err != nil {
return nil, err
}
} else {
config, err := NodeLabelsRetriever.InClusterConfig()
if err != nil {
return nil, err
}
// creates the clientset
clientset, err = NodeLabelsRetriever.NewForConfig(config)
if err != nil {
return nil, err
}
}
return clientset, nil
}
// GetNodeLabels returns labels present in the k8s node
func GetNodeLabels(ctx context.Context, kubeConfigPath string, kubeNodeName string) (map[string]string, error) {
k8sclientset, err := CreateKubeClientSet(kubeConfigPath)
if err != nil {
return nil, err
}
return NodeLabelsRetriever.GetNodeLabels(ctx, k8sclientset, kubeNodeName)
}
/*
*
* Copyright © 2021 Dell Inc. or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package common
import (
"context"
log "github.com/sirupsen/logrus"
)
// CustomLogger is logger wrapper that can be passed to gopowerstore, gobrick allowing to logging context fields with each call
type CustomLogger struct{}
// Info is a wrapper of logrus Info method
func (lg *CustomLogger) Info(ctx context.Context, format string, args ...interface{}) {
log.WithFields(GetLogFields(ctx)).Infof(format, args...)
}
// Debug is a wrapper of logrus Debug method
func (lg *CustomLogger) Debug(ctx context.Context, format string, args ...interface{}) {
log.WithFields(GetLogFields(ctx)).Debugf(format, args...)
}
// Error is a wrapper of logrus Error method
func (lg *CustomLogger) Error(ctx context.Context, format string, args ...interface{}) {
log.WithFields(GetLogFields(ctx)).Errorf(format, args...)
}
/*
*
* Copyright © 2021-2024 Dell Inc. or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package controller
import (
"context"
"strings"
"unicode/utf8"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/dell/csi-powerstore/v2/pkg/common"
"github.com/dell/gopowerstore"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
)
const (
// MinVolumeSizeBytes is minimal size for volume creation on PowerStore
MinVolumeSizeBytes = 1048576
// MinFilesystemSizeBytes is minimal size for Filesystem creation on PowerStore - 1.5Gi
MinFilesystemSizeBytes = 1610612736
// MaxVolumeSizeBytes is maximum size for volume creation on PowerStore
MaxVolumeSizeBytes = 1099511627776 * 256 // 256 TB
// VolumeSizeMultiple multiplier for volumes
VolumeSizeMultiple = 8192
// MaxVolumeNameLength max length for the volume name
MaxVolumeNameLength = 128
// ReplicationPrefix represents replication prefix
ReplicationPrefix = "replication.storage.dell.com"
// ErrUnknownAccessType represents error message for unknown access type
ErrUnknownAccessType = "unknown access type is not Block or Mount"
// ErrUnknownAccessMode represents error message for unknown access mode
ErrUnknownAccessMode = "access mode cannot be UNKNOWN"
// ErrNoMultiNodeWriter represents error message for multi node access
ErrNoMultiNodeWriter = "multi-node with writer(s) only supported for block access type"
// KeyFsType represents key for Fs Type
KeyFsType = "csi.storage.k8s.io/fstype"
// KeyFsTypeOld represents old key for Fs Type
KeyFsTypeOld = "FsType"
// KeyReplicationEnabled represents key for replication enabled
KeyReplicationEnabled = "isReplicationEnabled"
// KeyReplicationMode represents key for replication mode
KeyReplicationMode = "mode"
// KeyReplicationRPO represents key for replication RPO
KeyReplicationRPO = "rpo"
// KeyReplicationRemoteSystem represents key for replication remote system
KeyReplicationRemoteSystem = "remoteSystem"
// KeyReplicationIgnoreNamespaces represents key for replication ignore namespaces
KeyReplicationIgnoreNamespaces = "ignoreNamespaces"
// KeyReplicationVGPrefix represents key for replication vg prefix
KeyReplicationVGPrefix = "volumeGroupPrefix"
// KeyNasName represents key for nas name
KeyNasName = "nasName"
// KeyCSIPVCNamespace represents key for csi pvc namespace
KeyCSIPVCNamespace = "csi.storage.k8s.io/pvc/namespace"
// KeyCSIPVCName represents key for csi pvc name
KeyCSIPVCName = "csi.storage.k8s.io/pvc/name"
)
func volumeNameValidation(volumeName string) error {
if volumeName == "" {
return status.Errorf(codes.InvalidArgument, "name cannot be empty")
}
if utf8.RuneCountInString(volumeName) > MaxVolumeNameLength {
return status.Errorf(codes.InvalidArgument, "name must contain %d or fewer printable Unicode characters", MaxVolumeNameLength)
}
return nil
}
func volumeSizeValidation(minSize, maxSize int64) error {
if minSize < 0 || maxSize < 0 {
return status.Errorf(
codes.OutOfRange,
"bad capacity: volume size bytes %d and limit size bytes: %d must not be negative", minSize, maxSize)
}
if maxSize < minSize {
return status.Errorf(
codes.OutOfRange,
"bad capacity: max size bytes %d can't be less than minimum size bytes %d", maxSize, minSize)
}
if maxSize > MaxVolumeSizeBytes {
return status.Errorf(
codes.OutOfRange,
"bad capacity: max size bytes %d can't be more than maximum size bytes %d", maxSize, MaxVolumeSizeBytes)
}
return nil
}
func getCSIVolume(volumeID string, size int64) *csi.Volume {
volume := &csi.Volume{
VolumeId: volumeID,
CapacityBytes: size,
}
return volume
}
func getCSISnapshot(snapshotID string, sourceVolumeID string, sizeInBytes int64) *csi.Snapshot {
snap := &csi.Snapshot{
SizeBytes: sizeInBytes,
SnapshotId: snapshotID,
SourceVolumeId: sourceVolumeID,
CreationTime: timestamppb.Now(),
ReadyToUse: true,
}
return snap
}
func detachVolumeFromHost(ctx context.Context, hostID string, volumeID string, client gopowerstore.Client) error {
dp := &gopowerstore.HostVolumeDetach{VolumeID: &volumeID}
_, err := client.DetachVolumeFromHost(ctx, hostID, dp)
if err != nil {
apiError, ok := err.(gopowerstore.APIError)
if !ok {
return status.Errorf(codes.Unknown, "failed to detach volume '%s' from host: %s", volumeID, err.Error())
}
// In case of resiliency we can have multiple calls simultaneously (from podmon and k8) so to keep it idempotent
if strings.Contains(apiError.Message, "Host is not attached to volume") {
return nil
}
if apiError.HostIsNotExist() {
return status.Errorf(codes.NotFound, "host with ID '%s' not found", hostID)
}
if !apiError.VolumeIsNotAttachedToHost() && !apiError.HostIsNotAttachedToVolume() && !apiError.NotFound() && !apiError.VolumeDetachedFromHost() {
return status.Errorf(codes.Unknown, "unexpected api error when detaching volume from host:%s", err.Error())
}
}
return nil
}
func accTypeIsBlock(vcs []*csi.VolumeCapability) bool {
for _, vc := range vcs {
if at := vc.GetBlock(); at != nil {
return true
}
}
return false
}
func checkValidAccessTypes(vcs []*csi.VolumeCapability) bool {
for _, vc := range vcs {
if vc == nil {
continue
}
atblock := vc.GetBlock()
if atblock != nil {
continue
}
atmount := vc.GetMount()
if atmount != nil {
continue
}
// Unknown access type, we should reject it.
return false
}
return true
}
func getDescription(params map[string]string) string {
if description, ok := params[common.KeyVolumeDescription]; ok {
return description
}
return params[KeyCSIPVCName] + "-" + params[KeyCSIPVCNamespace]
}
/*
*
* Copyright © 2021-2024 Dell Inc. or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package controller provides CSI specification compatible controller service.
package controller
import (
"context"
"errors"
"fmt"
"net/http"
"sort"
"strconv"
"strings"
"sync"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/dell/csi-powerstore/v2/core"
"github.com/dell/csi-powerstore/v2/pkg/array"
"github.com/dell/csi-powerstore/v2/pkg/common"
"github.com/dell/csi-powerstore/v2/pkg/common/fs"
commonext "github.com/dell/dell-csi-extensions/common"
podmon "github.com/dell/dell-csi-extensions/podmon"
csiext "github.com/dell/dell-csi-extensions/replication"
vgsext "github.com/dell/dell-csi-extensions/volumeGroupSnapshot"
csictx "github.com/dell/gocsi/context"
"github.com/dell/gopowerstore"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/wrapperspb"
)
// Interface provides most important controller methods.
// This essentially serves as a wrapper for controller service that is used in ephemeral volumes.
type Interface interface {
CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error)
DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error)
ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error)
ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error)
array.Consumer
}
// Service is a controller service that contains array connection information and implements ControllerServer API
type Service struct {
Fs fs.Interface
externalAccess string
nfsAcls string
array.Locker
replicationContextPrefix string
replicationPrefix string
isHealthMonitorEnabled bool
isAutoRoundOffFsSizeEnabled bool
}
// maxVolumesSizeForArray - store the maxVolumesSizeForArray
var maxVolumesSizeForArray = make(map[string]int64)
var mutex = &sync.Mutex{}
// Init is a method that initializes internal variables of controller service
func (s *Service) Init() error {
ctx := context.Background()
if nat, ok := csictx.LookupEnv(ctx, common.EnvExternalAccess); ok {
s.externalAccess = nat
}
if replicationContextPrefix, ok := csictx.LookupEnv(ctx, common.EnvReplicationContextPrefix); ok {
s.replicationContextPrefix = replicationContextPrefix + "/"
}
if replicationPrefix, ok := csictx.LookupEnv(ctx, common.EnvReplicationPrefix); ok {
s.replicationPrefix = replicationPrefix
}
if isHealthMonitorEnabled, ok := csictx.LookupEnv(ctx, common.EnvIsHealthMonitorEnabled); ok {
s.isHealthMonitorEnabled, _ = strconv.ParseBool(isHealthMonitorEnabled)
}
s.nfsAcls = ""
if nfsAcls, ok := csictx.LookupEnv(ctx, common.EnvNfsAcls); ok {
if nfsAcls != "" {
s.nfsAcls = nfsAcls
}
}
if isAutoRoundOffFsSizeEnabled, ok := csictx.LookupEnv(ctx, common.EnvAllowAutoRoundOffFilesystemSize); ok {
log.Warn("Auto round off Filesystem size has been enabled! This will round off NFS PVC size to 3Gi when the requested size is less than 3Gi.")
s.isAutoRoundOffFsSizeEnabled, _ = strconv.ParseBool(isAutoRoundOffFsSizeEnabled)
}
return nil
}
// CreateVolume creates either FileSystem or Volume on storage array.
func (s *Service) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
params := req.GetParameters()
// Get array from map
arrayID, ok := params[common.KeyArrayID]
var arr *array.PowerStoreArray
// If no ArrayID was provided in storage class we just use default array
if !ok {
if _, ok := params["arrayIP"]; ok {
return nil, status.Error(codes.Internal, "Array IP's been provided, however it is not supported in "+
"current version. Configure you storage classes according to the documentation")
}
arr = s.DefaultArray()
} else {
arr, ok = s.Arrays()[arrayID]
if !ok {
return nil, status.Errorf(codes.Internal, "can't find array with provided id %s", arrayID)
}
}
// Check if should use nfs
useNFS := false
fsType := req.VolumeCapabilities[0].GetMount().GetFsType()
useNFS = fsType == "nfs"
if req.VolumeCapabilities[0].GetBlock() != nil {
// We need to check if user requests raw block access from nfs and prevent that
fsType, ok := params[KeyFsType]
// FsType can be empty
if ok && fsType == "nfs" {
return nil, status.Errorf(codes.InvalidArgument, "raw block requested from NFS Volume")
}
fsType, ok = params[KeyFsTypeOld]
if ok && fsType == "nfs" {
return nil, status.Errorf(codes.InvalidArgument, "raw block requested from NFS Volume")
}
}
// Prevent user from creating an NFS volume with incorrect topology(e.g. iscsi, nvme). At least one entry for nfs should be present in the topology, otherwise return an error
if useNFS && req.AccessibilityRequirements != nil {
if ok := common.HasRequiredTopology(req.AccessibilityRequirements.Preferred, arr.GetIP(), "nfs"); !ok {
return nil, status.Errorf(codes.InvalidArgument, "invalid topology requested for NFS Volume. Please validate your storage class has nfs topology.")
}
}
var creator VolumeCreator
var protocol string
nfsAcls := s.nfsAcls
if useNFS {
protocol = "nfs"
nasParamsName, ok := params[KeyNasName]
if ok {
creator = &NfsCreator{
nasName: nasParamsName,
}
} else {
creator = &NfsCreator{
nasName: arr.GetNasName(),
}
}
if params[common.KeyNfsACL] != "" {
nfsAcls = params[common.KeyNfsACL] // Storage class takes precedence
} else if arr.NfsAcls != "" {
nfsAcls = arr.NfsAcls // Secrets next
}
} else {
protocol = "scsi"
creator = &SCSICreator{}
}
var topology []*csi.Topology
if req.AccessibilityRequirements != nil {
topology = req.AccessibilityRequirements.Preferred
}
if err := creator.CheckName(ctx, req.GetName()); err != nil {
return nil, err
}
sizeInBytes, err := creator.CheckSize(ctx, req.GetCapacityRange(), s.isAutoRoundOffFsSizeEnabled)
if err != nil {
return nil, err
}
contentSource := req.GetVolumeContentSource()
if contentSource != nil {
var volResp *csi.Volume
var err error
volumeSource := contentSource.GetVolume()
if volumeSource != nil {
log.Printf("volume %s specified as volume content source", volumeSource.VolumeId)
parsedID, _, _, _, _, _ := array.ParseVolumeID(ctx, volumeSource.VolumeId, s.DefaultArray(), nil)
volumeSource.VolumeId = parsedID
volResp, err = creator.Clone(ctx, volumeSource, req.GetName(), sizeInBytes, req.Parameters, arr.GetClient())
}
snapshotSource := contentSource.GetSnapshot()
if snapshotSource != nil {
log.Printf("snapshot %s specified as volume content source", snapshotSource.SnapshotId)
parsedID, _, _, _, _, _ := array.ParseVolumeID(ctx, snapshotSource.SnapshotId, s.DefaultArray(), nil)
snapshotSource.SnapshotId = parsedID
volResp, err = creator.CreateVolumeFromSnapshot(ctx, snapshotSource,
req.GetName(), sizeInBytes, req.Parameters, arr.GetClient())
}
if err != nil {
resp, err := creator.CheckIfAlreadyExists(ctx, req.GetName(), sizeInBytes, arr.GetClient())
if err != nil {
return nil, err
}
if snapshotSource != nil {
volResp = getCSIVolumeFromSnapshot(resp.VolumeId, snapshotSource, sizeInBytes)
} else {
volResp = getCSIVolumeFromClone(resp.VolumeId, volumeSource, sizeInBytes)
}
volResp.VolumeContext = req.Parameters
}
if volResp == nil {
return nil, err
}
volResp.VolumeId = volResp.VolumeId + "/" + arr.GetGlobalID() + "/" + protocol
if useNFS {
topology = common.GetNfsTopology(arr.GetIP())
log.Infof("Modified topology to nfs for %s", req.GetName())
}
volResp.AccessibleTopology = topology
return &csi.CreateVolumeResponse{
Volume: volResp,
}, nil
}
var vg gopowerstore.VolumeGroup
var remoteSystem gopowerstore.RemoteSystem
// Check if replication is enabled
replicationEnabled := params[s.WithRP(KeyReplicationEnabled)]
var remoteSystemName string
isMetroVolume := false
isMetroVolumeGroup := false
if replicationEnabled == "true" && !useNFS {
log.Info("Preparing volume replication")
remoteSystemName, ok = params[s.WithRP(KeyReplicationRemoteSystem)]
if !ok {
return nil, status.Errorf(codes.InvalidArgument, "replication enabled but no remote system specified in storage class")
}
repMode := params[s.WithRP(KeyReplicationMode)]
if repMode == "" {
repMode = "ASYNC"
}
repMode = strings.ToUpper(repMode)
switch repMode {
case "SYNC", "ASYNC":
// handle Sync and Async modes where protection policy with replication rule is applied on volume group
log.Infof("%s replication mode requested", repMode)
vgPrefix, ok := params[s.WithRP(KeyReplicationVGPrefix)]
if !ok {
return nil, status.Errorf(codes.InvalidArgument, "replication enabled but no volume group prefix specified in storage class")
}
rpo, ok := params[s.WithRP(KeyReplicationRPO)]
if !ok {
return nil, status.Errorf(codes.InvalidArgument, "replication enabled but no RPO specified in storage class")
}
rpoEnum := gopowerstore.RPOEnum(rpo)
if err := rpoEnum.IsValid(); err != nil {
return nil, status.Errorf(codes.InvalidArgument, "invalid RPO value")
}
namespace := ""
if ignoreNS, ok := params[s.WithRP(KeyReplicationIgnoreNamespaces)]; ok && ignoreNS == "false" {
pvcNS, ok := params[KeyCSIPVCNamespace]
if ok {
namespace = pvcNS + "-"
}
}
vgName := vgPrefix + "-" + namespace + remoteSystemName + "-" + rpo
if len(vgName) > 128 {
vgName = vgName[:128]
}
vg, err = arr.Client.GetVolumeGroupByName(ctx, vgName)
if err != nil {
if apiError, ok := err.(gopowerstore.APIError); ok && apiError.NotFound() {
log.Infof("Volume group with name %s not found, creating it", vgName)
// ensure protection policy exists
pp, err := EnsureProtectionPolicyExists(ctx, arr, vgName, remoteSystemName, rpoEnum)
if err != nil {
return nil, status.Errorf(codes.Internal, "can't ensure protection policy exists %s", err.Error())
}
group, err := arr.Client.CreateVolumeGroup(ctx, &gopowerstore.VolumeGroupCreate{
Name: vgName,
ProtectionPolicyID: pp,
})
if err != nil {
return nil, status.Errorf(codes.Internal, "can't create volume group: %s", err.Error())
}
vg, err = arr.Client.GetVolumeGroup(ctx, group.ID)
if err != nil {
return nil, status.Errorf(codes.Internal, "can't query volume group by id %s : %s", group.ID, err.Error())
}
} else {
return nil, status.Errorf(codes.Internal, "can't query volume group by name %s : %s", vgName, err.Error())
}
} else {
// group exists, check that protection policy applied
if vg.ProtectionPolicyID == "" {
pp, err := EnsureProtectionPolicyExists(ctx, arr, vgName, remoteSystemName, rpoEnum)
if err != nil {
return nil, status.Errorf(codes.Internal, "can't ensure protection policy exists %s", err.Error())
}
policyUpdate := gopowerstore.VolumeGroupChangePolicy{ProtectionPolicyID: pp}
_, err = arr.Client.UpdateVolumeGroupProtectionPolicy(ctx, vg.ID, &policyUpdate)
if err != nil {
return nil, status.Errorf(codes.Internal, "can't update volume group policy %s", err.Error())
}
}
}
if c, ok := creator.(*SCSICreator); ok {
c.vg = &vg
}
case "METRO":
// handle Metro mode where metro is configured directly on the volume (or volume group if requested)
log.Info("Metro replication mode requested")
// Get specified remote system object for its ID
remoteSystem, err = arr.Client.GetRemoteSystemByName(ctx, remoteSystemName)
if err != nil {
return nil, status.Errorf(codes.Internal, "can't query remote system by name: %s", err.Error())
}
// TODO If volumeGroup input is specified in SC - Verify VolumeGroup exists, if not create one
// There shouldn't be any protection policy on it with replication rule
// Cannot configure Metro on empty VG. Configure after volume is added
// Check if the above sync/async block can be optimized w.r.t volume group calls
// isMetroVolumeGroup = true
isMetroVolume = true // set to true if volume group is not specified
default:
return nil, status.Errorf(codes.InvalidArgument, "replication enabled but invalid replication mode specified in storage class")
}
}
params[common.KeyVolumeDescription] = getDescription(req.GetParameters())
var volumeResponse *csi.Volume
resp, err := creator.Create(ctx, req, sizeInBytes, arr.GetClient())
if err != nil {
if apiError, ok := err.(gopowerstore.APIError); ok && (apiError.VolumeNameIsAlreadyUse() || apiError.FSNameIsAlreadyUse()) {
volumeResponse, err = creator.CheckIfAlreadyExists(ctx, req.GetName(), sizeInBytes, arr.GetClient())
if err != nil {
return nil, err
}
} else {
return nil, status.Error(codes.Internal, err.Error())
}
} else {
volumeResponse = getCSIVolume(resp.ID, sizeInBytes)
}
metroVolumeIDSuffix := ""
if isMetroVolume {
// Configure Metro on volume
volID := volumeResponse.VolumeId
log.Infof("Configuring Metro on volume %s", volID)
metroSession, err := arr.GetClient().ConfigureMetroVolume(ctx, volID, &gopowerstore.MetroConfig{
RemoteSystemID: remoteSystem.ID,
})
if err != nil {
if apiError, ok := err.(gopowerstore.APIError); ok && apiError.ReplicationSessionAlreadyCreated() { // idempotency check
log.Debugf("Metro has already been configured on volume %s", volID)
} else {
return nil, status.Errorf(codes.Internal, "can't configure metro on volume: %s", err.Error())
}
} else {
log.Infof("Metro Session %s created for volume %s", metroSession.ID, volID)
}
// Get the remote volume ID from the replication session.
replicationSession, err := arr.GetClient().GetReplicationSessionByLocalResourceID(ctx, volID)
if err != nil {
return nil, status.Errorf(codes.Internal, "could not get metro replication session: %s", err.Error())
}
// Confirm the replication session is of the 'volume' type
if strings.ToLower(replicationSession.ResourceType) != "volume" {
return nil, status.Errorf(codes.FailedPrecondition, "replication session %s has a resource type %s, wanted type 'volume'",
replicationSession.ID, replicationSession.ResourceType)
}
// Build the metro volume handle suffix
metroVolumeIDSuffix = ":" + replicationSession.RemoteResourceID + "/" + remoteSystem.SerialNumber
} else if isMetroVolumeGroup {
// TODO configure Metro on volume group if it is first time
// else pause and resume metro session for adding new volumes
// Session needs to be paused before the new volume can be added (before creator.Create()) and then resumed later here.
log.Warn("Configuring Metro on volume group, not yet implemented.")
}
// Fetch the service tag
serviceTag := GetServiceTag(ctx, req, arr, volumeResponse.VolumeId, protocol)
volumeResponse.VolumeContext = req.Parameters
volumeResponse.VolumeContext[common.KeyArrayID] = arr.GetGlobalID()
volumeResponse.VolumeContext[common.KeyArrayVolumeName] = req.Name
volumeResponse.VolumeContext[common.KeyProtocol] = protocol
volumeResponse.VolumeContext[common.KeyServiceTag] = serviceTag
if useNFS {
volumeResponse.VolumeContext[common.KeyNfsACL] = nfsAcls
volumeResponse.VolumeContext[common.KeyNasName] = arr.GetNasName()
topology = common.GetNfsTopology(arr.GetIP())
log.Infof("Modified topology to nfs for %s", req.GetName())
}
volumeResponse.VolumeId = volumeResponse.VolumeId + "/" + arr.GetGlobalID() + "/" + protocol + metroVolumeIDSuffix
volumeResponse.AccessibleTopology = topology
return &csi.CreateVolumeResponse{
Volume: volumeResponse,
}, nil
}
// DeleteVolume deletes either FileSystem or Volume from storage array.
func (s *Service) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
id := req.GetVolumeId()
if id == "" {
return nil, status.Error(codes.InvalidArgument, "volume ID is required")
}
id, arrayID, protocol, _, _, err := array.ParseVolumeID(ctx, id, s.DefaultArray(), nil)
if err != nil {
if apiError, ok := err.(gopowerstore.APIError); ok && apiError.NotFound() {
return &csi.DeleteVolumeResponse{}, nil
}
return nil, err
}
arr, ok := s.Arrays()[arrayID]
if !ok {
return nil, status.Errorf(codes.Internal, "can't find array with provided id %s", arrayID)
}
if protocol == "nfs" {
listSnaps, err := arr.GetClient().GetFsSnapshotsByVolumeID(ctx, id)
if err != nil {
return nil, status.Errorf(codes.Unknown, "failure getting snapshot: %s", err.Error())
}
if len(listSnaps) > 0 {
return nil, status.Errorf(codes.FailedPrecondition,
"unable to delete FS volume -- snapshots based on this volume still exist: %v",
listSnaps)
}
// Validate if filesystem has any NFS or SMB shares or snapshots attached
nfsExportResp, _ := arr.GetClient().GetNFSExportByFileSystemID(ctx, id)
if len(nfsExportResp.ROHosts) > 0 ||
len(nfsExportResp.RORootHosts) > 0 ||
len(nfsExportResp.RWHosts) > 0 ||
len(nfsExportResp.RWRootHosts) > 0 {
// if one entry is there for RWRootHosts or RWHosts, check if this is the same externalAccess defined in value.yaml
// if yes modifyNFSExport and remove externalAccess from the HostAcceesList on the array
if (len(nfsExportResp.RWRootHosts) == 1 || len(nfsExportResp.RWHosts) == 1) && s.externalAccess != "" {
externalAccess, err := common.ParseCIDR(s.externalAccess)
if err != nil {
log.Debug("error occurred while parsing externalAccess: ", err.Error(), s.externalAccess)
return nil, status.Errorf(codes.FailedPrecondition,
"filesystem %s cannot be deleted as it has associated NFS or SMB shares.",
id)
}
modifyNFSExport := false
// we need to construct the payload dynamically otherwise 400 error will be thrown
var modifyHostPayload gopowerstore.NFSExportModify
// Removing externalAccess from RWHosts as well as RWRootHosts
if len(nfsExportResp.RWRootHosts) == 1 && externalAccess == nfsExportResp.RWRootHosts[0] {
log.Debug("Trying to remove externalAccess IP with mask having RWRootHosts access while deleting the volume: ", externalAccess)
modifyNFSExport = true
modifyHostPayload.RemoveRWRootHosts = []string{externalAccess}
}
if len(nfsExportResp.RWHosts) == 1 && externalAccess == nfsExportResp.RWHosts[0] {
log.Debug("Trying to remove externalAccess IP with mask having RWHosts access while deleting the volume: ", externalAccess)
modifyNFSExport = true
modifyHostPayload.RemoveRWHosts = []string{externalAccess}
}
// call ModifyNFSExport API only when payload is not empty i.e. something is there to modify
if modifyNFSExport {
_, err = arr.GetClient().ModifyNFSExport(ctx, &modifyHostPayload, nfsExportResp.ID)
if err != nil {
log.Debug("failure when removing externalAccess from nfs export: ", err.Error())
if apiError, ok := err.(gopowerstore.APIError); !(ok && apiError.HostAlreadyRemovedFromNFSExport()) {
return nil, status.Errorf(codes.FailedPrecondition,
"filesystem %s cannot be deleted as it has associated NFS or SMB shares.",
id)
}
}
} else {
// either of RWRootHosts or RWHosts has one entry but it is not externalAccess
return nil, status.Errorf(codes.FailedPrecondition,
"filesystem %s cannot be deleted as it has associated NFS or SMB shares.",
id)
}
} else {
return nil, status.Errorf(codes.FailedPrecondition,
"filesystem %s cannot be deleted as it has associated NFS or SMB shares.",
id)
}
}
_, err = arr.GetClient().DeleteFS(ctx, id)
if err == nil {
return &csi.DeleteVolumeResponse{}, nil
}
if apiError, ok := err.(gopowerstore.APIError); ok {
if apiError.NotFound() {
return &csi.DeleteVolumeResponse{}, nil
}
}
return nil, err
} else if protocol == "scsi" {
// query volume groups?
vgs, err := arr.GetClient().GetVolumeGroupsByVolumeID(ctx, id)
if err != nil {
if apiError, ok := err.(gopowerstore.APIError); !ok || !apiError.NotFound() {
return nil, err
}
}
if len(vgs.VolumeGroup) != 0 {
// Remove volume from volume group
// TODO: If volume has multiple volume group then how we should find ours?
// TODO: Maybe adding volumegroup id/name to volume id can help?
_, err := arr.GetClient().RemoveMembersFromVolumeGroup(ctx, &gopowerstore.VolumeGroupMembers{VolumeIDs: []string{id}}, vgs.VolumeGroup[0].ID)
if err != nil {
// TODO: check for idempotency cases
return nil, err
}
// Unassign protection policy
_, err = arr.GetClient().ModifyVolume(ctx, &gopowerstore.VolumeModify{ProtectionPolicyID: ""}, id)
if err != nil {
return nil, err
}
}
// TODO: if len(vgs.VolumeGroup == 1) && it is the last volume : delete volume group
// TODO: What to do with RPO snaps?
listSnaps, err := arr.GetClient().GetSnapshotsByVolumeID(ctx, id)
if err != nil {
return nil, status.Errorf(codes.Unknown, "failure getting snapshot: %s", err.Error())
}
if len(listSnaps) > 0 {
return nil, status.Errorf(codes.FailedPrecondition,
"unable to delete volume -- %d snapshots based on this volume still exist.", len(listSnaps))
}
// Check if volume has metro session and end it
volume, err := arr.GetClient().GetVolume(ctx, id)
if err != nil {
if apiError, ok := err.(gopowerstore.APIError); ok && apiError.NotFound() {
log.Infof("Volume %s not found, it may have been deleted.", id)
return &csi.DeleteVolumeResponse{}, nil
}
return nil, status.Errorf(codes.Internal, "failure getting volume: %s", err.Error())
}
if volume.MetroReplicationSessionID != "" {
_, err = arr.GetClient().EndMetroVolume(ctx, id, &gopowerstore.EndMetroVolumeOptions{
DeleteRemoteVolume: true, // delete remote volume when deleting local volume
})
if err != nil {
return nil, status.Errorf(codes.Internal, "failure ending metro session on volume: %s", err.Error())
}
}
// Delete volume
_, err = arr.GetClient().DeleteVolume(ctx, nil, id)
if err == nil {
return &csi.DeleteVolumeResponse{}, nil
}
if apiError, ok := err.(gopowerstore.APIError); ok {
if apiError.NotFound() {
return &csi.DeleteVolumeResponse{}, nil
}
if apiError.VolumeAttachedToHost() {
return nil, status.Errorf(codes.Internal,
"volume with ID '%s' is still attached to host: %s", id, apiError.Error())
}
}
return nil, err
}
return nil, status.Errorf(codes.InvalidArgument, "can't figure out protocol")
}
// ControllerPublishVolume prepares Volume/FileSystem to be consumed by node by attaching/allowing access to the host.
func (s *Service) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
id := req.GetVolumeId()
if id == "" {
return nil, status.Error(codes.InvalidArgument, "volume ID is required")
}
id, arrayID, protocol, _, _, err := array.ParseVolumeID(ctx, id, s.DefaultArray(), req.VolumeCapability)
if err != nil {
log.Info(err)
return nil, err
}
arr, ok := s.Arrays()[arrayID]
if !ok {
log.Info("ip is nil")
return nil, status.Error(codes.InvalidArgument, "failed to find array with given ID")
}
vc := req.GetVolumeCapability()
if vc == nil {
return nil, status.Error(codes.InvalidArgument, "volume capability is required")
}
am := vc.GetAccessMode()
if am == nil {
return nil, status.Error(codes.InvalidArgument, "access mode is required")
}
if am.Mode == csi.VolumeCapability_AccessMode_UNKNOWN {
return nil, status.Error(codes.InvalidArgument, ErrUnknownAccessMode)
}
kubeNodeID := req.GetNodeId()
if kubeNodeID == "" {
return nil, status.Error(codes.InvalidArgument, "node ID is required")
}
var publisher VolumePublisher
if protocol == "nfs" {
publisher = &NfsPublisher{
ExternalAccess: s.externalAccess,
}
} else {
publisher = &SCSIPublisher{}
}
if err := publisher.CheckIfVolumeExists(ctx, arr.GetClient(), id); err != nil {
return nil, err
}
return publisher.Publish(ctx, req, arr.GetClient(), kubeNodeID, id)
}
// ControllerUnpublishVolume prepares Volume/FileSystem to be deleted by unattaching/disabling access to the host.
func (s *Service) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
id := req.GetVolumeId()
if id == "" {
return nil, status.Error(codes.InvalidArgument, "volume ID is required")
}
kubeNodeID := req.GetNodeId()
if kubeNodeID == "" {
return nil, status.Error(codes.InvalidArgument, "node ID is required")
}
id, arrayID, protocol, _, _, err := array.ParseVolumeID(ctx, id, s.DefaultArray(), nil)
if err != nil {
if apiError, ok := err.(gopowerstore.APIError); ok && apiError.NotFound() {
return &csi.ControllerUnpublishVolumeResponse{}, nil
}
return nil, status.Errorf(codes.Unknown,
"failure checking volume status for volume unpublishing: %s", err.Error())
}
arr, ok := s.Arrays()[arrayID]
if !ok {
return nil, status.Errorf(codes.InvalidArgument, "cannot find array %s", arrayID)
}
if protocol == "scsi" {
node, err := arr.GetClient().GetHostByName(ctx, kubeNodeID)
if err != nil {
if apiError, ok := err.(gopowerstore.APIError); ok && apiError.HostIsNotExist() {
// We need additional check here since we can just have host without ip in it
ipList := common.GetIPListFromString(kubeNodeID)
if ipList == nil {
return nil, errors.New("can't find IP in nodeID")
}
ip := ipList[len(ipList)-1]
nodeID := kubeNodeID[:len(kubeNodeID)-len(ip)-1]
node, err = arr.GetClient().GetHostByName(ctx, nodeID)
if err != nil {
return nil, status.Errorf(codes.NotFound, "host with k8s node ID '%s' not found", kubeNodeID)
}
} else {
return nil, status.Errorf(codes.Internal,
"failure checking host '%s' status for volume unpublishing: %s", kubeNodeID, err.Error())
}
}
err = detachVolumeFromHost(ctx, node.ID, id, arr.GetClient())
if err != nil {
return nil, err
}
return &csi.ControllerUnpublishVolumeResponse{}, nil
} else if protocol == "nfs" {
fs, err := arr.GetClient().GetFS(ctx, id)
if err != nil {
if apiError, ok := err.(gopowerstore.APIError); ok && apiError.NotFound() {
return &csi.ControllerUnpublishVolumeResponse{}, nil
}
return nil, status.Errorf(codes.Unknown, "failure checking volume status for volume unpublishing: %s", err.Error())
}
// Parse volumeID to get an IP
ipList := common.GetIPListFromString(kubeNodeID)
if ipList == nil {
return nil, errors.New("can't find IP in nodeID")
}
ip := ipList[0]
export, err := arr.GetClient().GetNFSExportByFileSystemID(ctx, fs.ID)
if err != nil {
if apiError, ok := err.(gopowerstore.APIError); ok && apiError.NotFound() {
return &csi.ControllerUnpublishVolumeResponse{}, nil
}
return nil, status.Errorf(codes.Internal,
"failure checking nfs export status for volume unpublishing: %s", err.Error())
}
// we need to construct the payload dynamically otherwise 400 error will be thrown
var modifyHostPayload gopowerstore.NFSExportModify
sort.Strings(export.ROHosts)
index := sort.SearchStrings(export.ROHosts, ip)
if len(export.ROHosts) > 0 {
if index >= 0 {
modifyHostPayload.RemoveROHosts = []string{ip + "/255.255.255.255"} // we can't remove without netmask
log.Debug("Going to remove IP from ROHosts: ", modifyHostPayload.RemoveROHosts[0])
}
}
sort.Strings(export.RORootHosts)
index = sort.SearchStrings(export.RORootHosts, ip)
if len(export.RORootHosts) > 0 {
if index >= 0 {
modifyHostPayload.RemoveRORootHosts = []string{ip + "/255.255.255.255"} // we can't remove without netmask
log.Debug("Going to remove IP from RORootHosts: ", modifyHostPayload.RemoveRORootHosts[0])
}
}
if common.Contains(export.RWHosts, ip+"/255.255.255.255") {
modifyHostPayload.RemoveRWHosts = []string{ip + "/255.255.255.255"} // we can't remove without netmask
log.Debug("Going to remove IP from RWHosts: ", modifyHostPayload.RemoveRWHosts[0])
}
if common.Contains(export.RWRootHosts, ip+"/255.255.255.255") {
modifyHostPayload.RemoveRWRootHosts = []string{ip + "/255.255.255.255"} // we can't remove without netmask
log.Debug("Going to remove IP from RWRootHosts: ", modifyHostPayload.RemoveRWRootHosts[0])
}
// Detach host from nfs export
_, err = arr.GetClient().ModifyNFSExport(ctx, &modifyHostPayload, export.ID)
if err != nil {
if apiError, ok := err.(gopowerstore.APIError); !(ok && apiError.HostAlreadyRemovedFromNFSExport()) {
log.Debug("Error occured while modifying NFS export during UnPublishVolume", err.Error())
return nil, status.Errorf(codes.Internal,
"failure when removing new host to nfs export: %s", err.Error())
}
}
return &csi.ControllerUnpublishVolumeResponse{}, nil
}
return nil, status.Errorf(codes.InvalidArgument, "can't figure out protocol")
}
// GetServiceTag returns the service tag associated with an appliance
func GetServiceTag(ctx context.Context, req *csi.CreateVolumeRequest, arr *array.PowerStoreArray, volID string, protocol string) string {
var ap gopowerstore.ApplianceInstance
var vol gopowerstore.Volume
var f gopowerstore.FileSystem
var nas gopowerstore.NAS
var applianceName string
var err error
// Check if appliance id is present in PVC manifest
if applianceID, ok := (req.Parameters)["appliance_id"]; ok {
// Fetching appliance information using the appliance id
ap, err = arr.Client.GetAppliance(ctx, applianceID)
if err != nil {
log.Warn("Received error while calling GetAppliance ", err.Error())
}
} else {
if protocol != "nfs" {
vol, err = arr.Client.GetVolume(ctx, volID)
if err != nil {
log.Warn("Received error while calling GetVolume ", err.Error())
}
if vol.ApplianceID == "" {
log.Warn("Unable to fetch ApplianceID from the volume")
} else {
ap, err = arr.Client.GetAppliance(ctx, vol.ApplianceID)
if err != nil {
log.Warn("Received error while calling GetAppliance ", err.Error())
}
}
} else {
f, err = arr.Client.GetFS(ctx, volID)
if err != nil {
log.Warn("Received error while calling GetFS ", err.Error())
}
if f.NasServerID == "" {
log.Warn("Unable to fetch the NasServerID from the file system")
} else {
nas, err = arr.Client.GetNAS(ctx, f.NasServerID)
if err != nil {
log.Warn("Received error while calling GetNAS ", err.Error())
}
if nas.CurrentNodeID == "" {
log.Warn("Unable to fetch the CurrentNodeId from the nas server")
} else {
// Removing "-node-X" from the end of CurrentNodeId to get Appliance Name
applianceName = strings.Split(nas.CurrentNodeID, "-node-")[0]
// Fetching appliance information using the appliance name
ap, err = arr.Client.GetApplianceByName(ctx, applianceName)
if err != nil {
log.Warn("Received error while calling GetApplianceByName ", err.Error())
}
}
}
}
}
return ap.ServiceTag
}
// ValidateVolumeCapabilities checks if capabilities found in request are supported by driver.
func (s *Service) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
var (
supported = true
isBlock = accTypeIsBlock(req.VolumeCapabilities)
reason string
)
// Check that all access types are valid
if !checkValidAccessTypes(req.VolumeCapabilities) {
return &csi.ValidateVolumeCapabilitiesResponse{
Confirmed: nil,
Message: ErrUnknownAccessType,
}, status.Error(codes.Internal, ErrUnknownAccessType)
}
for _, vc := range req.VolumeCapabilities {
am := vc.GetAccessMode()
if am == nil {
continue
}
switch am.Mode {
case csi.VolumeCapability_AccessMode_UNKNOWN:
supported = false
reason = ErrUnknownAccessMode
break
// SINGLE_NODE_WRITER to be deprecated in future
case csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER:
break
case csi.VolumeCapability_AccessMode_SINGLE_NODE_SINGLE_WRITER:
break
case csi.VolumeCapability_AccessMode_SINGLE_NODE_MULTI_WRITER:
break
case csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY:
break
case csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY:
break
case csi.VolumeCapability_AccessMode_MULTI_NODE_SINGLE_WRITER:
fallthrough
case csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER:
if !isBlock {
supported = false
reason = ErrNoMultiNodeWriter
}
break
default:
// This is to guard against new access modes not understood
supported = false
reason = ErrUnknownAccessMode
}
}
// for sanity
id := req.GetVolumeId()
id, arrayID, proto, _, _, err := array.ParseVolumeID(ctx, id, s.DefaultArray(), nil)
if err != nil {
return &csi.ValidateVolumeCapabilitiesResponse{}, status.Error(codes.NotFound, "No such volume")
}
if proto == "nfs" {
_, err := s.Arrays()[arrayID].Client.GetFS(ctx, id)
if err != nil {
return &csi.ValidateVolumeCapabilitiesResponse{
Confirmed: nil,
Message: "Failed to get volume",
}, status.Error(codes.NotFound, "Failed to get volume")
}
} else {
_, err := s.Arrays()[arrayID].Client.GetVolume(ctx, id)
if err != nil {
return &csi.ValidateVolumeCapabilitiesResponse{
Confirmed: nil,
Message: "Failed to get volume",
}, status.Error(codes.NotFound, "Failed to get volume")
}
}
if supported {
return &csi.ValidateVolumeCapabilitiesResponse{
Confirmed: &csi.ValidateVolumeCapabilitiesResponse_Confirmed{
VolumeContext: req.VolumeContext,
VolumeCapabilities: req.VolumeCapabilities,
Parameters: req.Parameters,
},
Message: reason,
}, nil
}
return &csi.ValidateVolumeCapabilitiesResponse{
Confirmed: nil,
Message: reason,
}, status.Error(codes.Internal, reason)
}
// ListVolumes returns all accessible volumes from the storage array.
func (s *Service) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
var (
startToken int
maxEntries = int(req.GetMaxEntries())
)
if v := req.GetStartingToken(); v != "" {
i, err := strconv.ParseInt(v, 10, 32)
if err != nil {
return nil, status.Errorf(codes.Aborted, "unable to parse StartingToken: %v into uint32", v)
}
startToken = int(i)
}
// Call the common listVolumes code
source, nextToken, err := s.listPowerStoreVolumes(ctx, startToken, maxEntries)
if err != nil {
return nil, err
}
// Process the source volumes and make CSI Volumes
entries := make([]*csi.ListVolumesResponse_Entry, len(source))
for i, vol := range source {
entries[i] = &csi.ListVolumesResponse_Entry{
Volume: getCSIVolume(vol.ID, vol.Size),
}
}
return &csi.ListVolumesResponse{
Entries: entries,
NextToken: nextToken,
}, nil
}
// GetCapacity returns available capacity for a storage array.
func (s *Service) GetCapacity(ctx context.Context, req *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) {
params := req.GetParameters()
// Get array from map
arrayID, ok := params[common.KeyArrayID]
var arr *array.PowerStoreArray
// If no ArrayIP was provided in storage class we just use default array
if !ok {
arr = s.DefaultArray()
} else {
arr, ok = s.Arrays()[arrayID]
if !ok {
return nil, status.Errorf(codes.Internal, "can't find array with provided id %s", arrayID)
}
}
capacity, err := arr.Client.GetCapacity(ctx)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
maxVolSize := getMaximumVolumeSize(ctx, arr)
if maxVolSize < 0 {
return &csi.GetCapacityResponse{
AvailableCapacity: capacity,
}, nil
}
maxVol := wrapperspb.Int64(maxVolSize)
return &csi.GetCapacityResponse{
AvailableCapacity: capacity,
MaximumVolumeSize: maxVol,
}, nil
}
func getMaximumVolumeSize(ctx context.Context, arr *array.PowerStoreArray) int64 {
valueInCache, found := getCachedMaximumVolumeSize(arr.GlobalID)
if !found || valueInCache < 0 {
defaultHeaders := arr.Client.GetCustomHTTPHeaders()
if defaultHeaders == nil {
defaultHeaders = make(http.Header)
}
customHeaders := defaultHeaders
customHeaders.Add("DELL-VISIBILITY", "internal")
arr.Client.SetCustomHTTPHeaders(customHeaders)
value, err := arr.Client.GetMaxVolumeSize(ctx)
if err != nil {
log.Debug(fmt.Sprintf("GetMaxVolumeSize returning: %v for Array having GlobalId %s", err, arr.GlobalID))
}
// reset custom header
customHeaders.Del("DELL-VISIBILITY")
arr.Client.SetCustomHTTPHeaders(customHeaders)
// Add a new entry to the MaximumVolumeSize
cacheMaximumVolumeSize(arr.GlobalID, value)
valueInCache = value
}
return valueInCache
}
func getCachedMaximumVolumeSize(key string) (int64, bool) {
mutex.Lock()
defer mutex.Unlock()
value, found := maxVolumesSizeForArray[key]
return value, found
}
func cacheMaximumVolumeSize(key string, value int64) {
mutex.Lock()
defer mutex.Unlock()
maxVolumesSizeForArray[key] = value
}
// ControllerGetCapabilities returns list of capabilities that are supported by the driver.
func (s *Service) ControllerGetCapabilities(_ context.Context, _ *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) {
newCap := func(cap csi.ControllerServiceCapability_RPC_Type) *csi.ControllerServiceCapability {
return &csi.ControllerServiceCapability{
Type: &csi.ControllerServiceCapability_Rpc{
Rpc: &csi.ControllerServiceCapability_RPC{
Type: cap,
},
},
}
}
var capabilities []*csi.ControllerServiceCapability
for _, capability := range []csi.ControllerServiceCapability_RPC_Type{
csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME,
csi.ControllerServiceCapability_RPC_GET_CAPACITY,
csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT,
csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS,
csi.ControllerServiceCapability_RPC_CLONE_VOLUME,
csi.ControllerServiceCapability_RPC_EXPAND_VOLUME,
csi.ControllerServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER,
} {
capabilities = append(capabilities, newCap(capability))
}
if s.isHealthMonitorEnabled {
for _, capability := range []csi.ControllerServiceCapability_RPC_Type{
csi.ControllerServiceCapability_RPC_GET_VOLUME,
csi.ControllerServiceCapability_RPC_LIST_VOLUMES_PUBLISHED_NODES,
csi.ControllerServiceCapability_RPC_VOLUME_CONDITION,
} {
capabilities = append(capabilities, newCap(capability))
}
}
return &csi.ControllerGetCapabilitiesResponse{
Capabilities: capabilities,
}, nil
}
// CreateSnapshot creates a snapshot of the Volume or FileSystem.
func (s *Service) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
snapName := req.GetName()
if err := volumeNameValidation(snapName); err != nil {
return nil, err
}
// Validate snapshot volume sourceVolID
sourceVolID := req.GetSourceVolumeId()
if sourceVolID == "" {
return nil, status.Errorf(codes.InvalidArgument, "volume ID to be snapped is required")
}
id, arrayID, protocol, _, _, err := array.ParseVolumeID(ctx, sourceVolID, s.DefaultArray(), nil)
if err != nil {
return nil, err
}
arr, ok := s.Arrays()[arrayID]
if !ok {
return nil, status.Error(codes.InvalidArgument, "failed to find array with given ID")
}
var snapshotter VolumeSnapshotter
var sourceVolumeSize int64
if protocol == "nfs" {
f, err := arr.GetClient().GetFS(ctx, id)
if err == nil {
sourceVolumeSize = f.SizeTotal - ReservedSize
} else {
return &csi.CreateSnapshotResponse{}, status.Errorf(codes.Internal,
"can't find source volume '%s': %s", id, err.Error())
}
snapshotter = &NfsSnapshotter{}
} else {
f, err := arr.GetClient().GetVolume(ctx, id)
if err == nil {
sourceVolumeSize = f.Size
} else {
return &csi.CreateSnapshotResponse{}, status.Errorf(codes.Internal,
"can't find source volume '%s': %s", id, err.Error())
}
snapshotter = &SCSISnapshotter{}
}
var snapResponse *csi.Snapshot
// Check if snapshot with provided name already exists but has a different source volume id
existingSnapshot, err := snapshotter.GetExistingSnapshot(ctx, snapName, arr.GetClient())
if err == nil {
if existingSnapshot.GetSourceID() != id {
return nil, status.Errorf(codes.AlreadyExists,
"snapshot with name '%s' exists, but SourceVolumeId %s doesn't match", snapName, id)
}
snapResponse = getCSISnapshot(existingSnapshot.GetID(), id, existingSnapshot.GetSize())
} else {
resp, err := snapshotter.Create(ctx, snapName, id, arr.GetClient())
if err != nil {
if apiError, ok := err.(gopowerstore.APIError); ok && apiError.SnapshotNameIsAlreadyUse() {
existingSnapshot, err := snapshotter.GetExistingSnapshot(ctx, snapName, arr.GetClient())
if err != nil {
return nil, err
}
snapResponse = getCSISnapshot(existingSnapshot.GetID(), id, existingSnapshot.GetSize())
} else {
return nil, status.Error(codes.Internal, err.Error())
}
} else {
snapResponse = getCSISnapshot(resp.ID, id, sourceVolumeSize)
}
}
snapResponse.SnapshotId = snapResponse.SnapshotId + "/" + arrayID + "/" + protocol
return &csi.CreateSnapshotResponse{
Snapshot: snapResponse,
}, nil
}
// DeleteSnapshot deletes a snapshot of the Volume or FileSystem.
func (s *Service) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) {
snapID := req.GetSnapshotId()
if snapID == "" {
return nil, status.Errorf(codes.InvalidArgument, "snapshot ID to be deleted is required")
}
id, arrayID, protocol, _, _, err := array.ParseVolumeID(ctx, snapID, s.DefaultArray(), nil)
if err != nil {
if apiError, ok := err.(gopowerstore.APIError); ok && apiError.NotFound() {
return &csi.DeleteSnapshotResponse{}, nil
}
return nil, err
}
arr, ok := s.Arrays()[arrayID]
if !ok {
return nil, status.Error(codes.InvalidArgument, "failed to find array with given ID")
}
if protocol == "nfs" {
_, err = arr.GetClient().GetFsSnapshot(ctx, id)
if err == nil {
_, err := arr.GetClient().DeleteFsSnapshot(ctx, id)
if err == nil {
return &csi.DeleteSnapshotResponse{}, nil
}
if apiError, ok := err.(gopowerstore.APIError); ok {
if apiError.NotFound() {
return &csi.DeleteSnapshotResponse{}, nil
}
}
return nil, err
}
} else {
snap, err := arr.GetClient().GetSnapshot(ctx, id)
if err == nil {
// we will check whether this snapshot is a part of volume group snapshot, if yes then we will delete the volume group snapshot
vgs, err := arr.GetClient().GetVolumeGroupsByVolumeID(ctx, snap.ID)
if len(vgs.VolumeGroup) != 0 && err == nil { // This means this snap is a part of VGS
_, err = arr.GetClient().DeleteVolumeGroup(ctx, vgs.VolumeGroup[0].ID)
if err == nil {
return &csi.DeleteSnapshotResponse{}, nil
}
}
_, err = arr.GetClient().DeleteSnapshot(ctx, nil, id)
if err == nil {
return &csi.DeleteSnapshotResponse{}, nil
}
if apiError, ok := err.(gopowerstore.APIError); ok {
if apiError.NotFound() {
return &csi.DeleteSnapshotResponse{}, nil
}
}
return nil, err
}
}
if apiError, ok := err.(gopowerstore.APIError); ok {
if apiError.NotFound() {
return &csi.DeleteSnapshotResponse{}, nil
}
}
return nil, err
}
// ListSnapshots list all accessible snapshots from the storage array.
func (s *Service) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
var (
startToken int
maxEntries = int(req.GetMaxEntries())
snapshotID string
sourceVolID string
)
if req.SnapshotId != "" {
snapshotID = req.SnapshotId
}
if req.SourceVolumeId != "" {
sourceVolID = req.SourceVolumeId
}
if v := req.GetStartingToken(); v != "" {
i, err := strconv.ParseInt(v, 10, 32)
if err != nil {
return nil, status.Errorf(codes.Aborted, "unable to parse StartingToken: %v into uint32", v)
}
startToken = int(i)
}
// Call the common listVolumes code
source, nextToken, err := s.listPowerStoreSnapshots(ctx, startToken, maxEntries, snapshotID, sourceVolID)
if err != nil {
return nil, err
}
if len(source) == 0 {
return &csi.ListSnapshotsResponse{}, nil
}
// Process the source volumes and make CSI Volumes
entries := make([]*csi.ListSnapshotsResponse_Entry, len(source))
for i, snap := range source {
size := snap.GetSize()
// Correct size of filesystem snapshot
if snap.GetType() == FilesystemSnapshotType {
size = size - ReservedSize
}
entries[i] = &csi.ListSnapshotsResponse_Entry{
Snapshot: getCSISnapshot(snap.GetID(), snap.GetSourceID(), size),
}
}
return &csi.ListSnapshotsResponse{
Entries: entries,
NextToken: nextToken,
}, nil
}
// ControllerExpandVolume resizes Volume or FileSystem by increasing available volume capacity in the storage array.
func (s *Service) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
id, arrayID, protocol, _, _, err := array.ParseVolumeID(ctx, req.VolumeId, s.DefaultArray(), nil)
if err != nil {
return nil, status.Errorf(codes.OutOfRange, "unable to parse the volume id")
}
requiredBytes := req.GetCapacityRange().GetRequiredBytes()
if requiredBytes > MaxVolumeSizeBytes {
return nil, status.Errorf(codes.OutOfRange, "volume exceeds allowed limit")
}
if protocol == "scsi" {
vol, err := s.Arrays()[arrayID].Client.GetVolume(ctx, id)
if err != nil {
return nil, status.Errorf(codes.OutOfRange, "detected SCSI protocol but wasn't able to fetch the volume info")
}
if vol.Size < requiredBytes {
_, err = s.Arrays()[arrayID].Client.ModifyVolume(context.Background(), &gopowerstore.VolumeModify{Size: requiredBytes}, id)
if err != nil {
return nil, err
}
return &csi.ControllerExpandVolumeResponse{CapacityBytes: requiredBytes, NodeExpansionRequired: true}, nil
}
return &csi.ControllerExpandVolumeResponse{}, nil
}
fs, err := s.Arrays()[arrayID].Client.GetFS(ctx, id)
if err == nil {
if fs.SizeTotal < requiredBytes {
_, err = s.Arrays()[arrayID].Client.ModifyFS(context.Background(), &gopowerstore.FSModify{Size: int(requiredBytes + ReservedSize)}, id)
if err != nil {
return nil, err
}
}
}
return &csi.ControllerExpandVolumeResponse{CapacityBytes: requiredBytes, NodeExpansionRequired: false}, nil
}
// ControllerGetVolume fetch current information about a volume
func (s *Service) ControllerGetVolume(ctx context.Context, req *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) {
id, arrayID, protocol, _, _, err := array.ParseVolumeID(ctx, req.VolumeId, s.DefaultArray(), nil)
if err != nil {
return nil, status.Errorf(codes.OutOfRange, "unable to parse the volume id")
}
var hosts []string
abnormal := false
message := ""
if protocol == "nfs" {
// check if filesystem exists
fs, err := s.Arrays()[arrayID].Client.GetFS(ctx, id)
if err != nil {
if apiError, ok := err.(gopowerstore.APIError); !ok || !apiError.NotFound() {
return nil, status.Errorf(codes.NotFound, "failed to find filesystem %s with error: %v", id, err.Error())
}
abnormal = true
message = fmt.Sprintf("Filesystem %s is not found", id)
} else {
// get exports for filesystem if exists
nfsExport, err := s.Arrays()[arrayID].Client.GetNFSExportByFileSystemID(ctx, fs.ID)
if err != nil {
if apiError, ok := err.(gopowerstore.APIError); !ok || !apiError.NotFound() {
return nil, status.Errorf(codes.NotFound, "failed to find nfs export for filesystem with error: %v", err.Error())
}
} else {
// get hosts publish to export
hosts = append(nfsExport.ROHosts, nfsExport.RORootHosts...)
hosts = append(hosts, nfsExport.RWHosts...)
hosts = append(hosts, nfsExport.RWRootHosts...)
}
}
} else {
// check if volume exists
vol, err := s.Arrays()[arrayID].Client.GetVolume(ctx, id)
if err != nil {
if apiError, ok := err.(gopowerstore.APIError); !ok || !apiError.NotFound() {
return nil, status.Errorf(codes.NotFound, "failed to find volume %s with error: %v", id, err.Error())
}
abnormal = true
message = fmt.Sprintf("Volume %s is not found", id)
} else {
// get hosts published to volume
hostMappings, err := s.Arrays()[arrayID].Client.GetHostVolumeMappingByVolumeID(ctx, id)
if err != nil {
return nil, status.Errorf(codes.NotFound, "failed to get host volume mapping for volume: %s with error: %v", id, err.Error())
}
for _, hostMapping := range hostMappings {
host, err := s.Arrays()[arrayID].Client.GetHost(ctx, hostMapping.HostID)
if err != nil {
if apiError, ok := err.(gopowerstore.APIError); !ok || !apiError.NotFound() {
return nil, status.Errorf(codes.NotFound, "failed to get host: %s with error: %v", hostMapping.HostID, err.Error())
}
} else {
hosts = append(hosts, host.Name)
}
}
// check if volume is in ready state
if vol.State != gopowerstore.VolumeStateEnumReady {
abnormal = true
message = fmt.Sprintf("Volume %s is in %s state", id, string(vol.State))
}
}
}
resp := &csi.ControllerGetVolumeResponse{
Volume: &csi.Volume{
VolumeId: id,
},
Status: &csi.ControllerGetVolumeResponse_VolumeStatus{
PublishedNodeIds: hosts,
VolumeCondition: &csi.VolumeCondition{
Abnormal: abnormal,
Message: message,
},
},
}
return resp, nil
}
// RegisterAdditionalServers registers replication extension
func (s *Service) RegisterAdditionalServers(server *grpc.Server) {
csiext.RegisterReplicationServer(server, s)
vgsext.RegisterVolumeGroupSnapshotServer(server, s)
podmon.RegisterPodmonServer(server, s)
}
// ProbeController probes the controller service
func (s *Service) ProbeController(_ context.Context, _ *commonext.ProbeControllerRequest) (*commonext.ProbeControllerResponse, error) {
ready := new(wrapperspb.BoolValue)
ready.Value = true
rep := new(commonext.ProbeControllerResponse)
rep.Ready = ready
rep.Name = common.Name
rep.VendorVersion = core.SemVer
rep.Manifest = common.Manifest
log.Debug(fmt.Sprintf("ProbeController returning: %v", rep.Ready.GetValue()))
return rep, nil
}
func (s *Service) listPowerStoreVolumes(ctx context.Context, startToken, maxEntries int) ([]gopowerstore.Volume, string, error) {
var volumes []gopowerstore.Volume
// Get the volumes from the cache if we can
for _, arr := range s.Arrays() {
v, err := arr.GetClient().GetVolumes(ctx)
if err != nil {
return nil, "", status.Errorf(codes.Internal, "unable to list volumes: %s", err.Error())
}
volumes = append(volumes, v...)
}
if startToken > len(volumes) {
return nil, "", status.Errorf(codes.Aborted, "startingToken=%d > len(volumes)=%d", startToken, len(volumes))
}
// Discern the number of remaining entries.
rem := len(volumes) - startToken
// If maxEntries is 0 or greater than the number of remaining entries then
// set max entries to the number of remaining entries.
if maxEntries == 0 || maxEntries > rem {
maxEntries = rem
}
// We can't really return more per page
if maxEntries > 700 {
maxEntries = 700
}
// Compute the next starting point; if at end reset
nextToken := startToken + maxEntries
nextTokenStr := ""
if nextToken < (startToken + rem) {
nextTokenStr = fmt.Sprintf("%d", nextToken)
}
return volumes[startToken : startToken+maxEntries], nextTokenStr, nil
}
func (s *Service) listPowerStoreSnapshots(ctx context.Context, startToken, maxEntries int, snapID, srcID string) ([]GeneralSnapshot, string, error) {
var generalSnapshots []GeneralSnapshot
if snapID == "" && srcID == "" {
log.Info("Requested all snapshots, iterating through arrays")
for _, arr := range s.Arrays() {
// List block snapshots
snaps, err := arr.GetClient().GetSnapshots(ctx)
if err != nil {
return nil, "", status.Errorf(codes.Internal, "unable to list block snapshots: %s", err.Error())
}
for _, snap := range snaps {
generalSnapshots = append(generalSnapshots, VolumeSnapshot(snap))
}
// List filesystem snapshots too
fsSnaps, err := arr.GetClient().GetFsSnapshots(ctx)
if err != nil {
return nil, "", status.Errorf(codes.Internal, "unable to list filesystem snapshots: %s", err.Error())
}
for _, snap := range fsSnaps {
generalSnapshots = append(generalSnapshots, FilesystemSnapshot(snap))
}
}
} else if snapID != "" {
log.Infof("Requested snapshot via snapshot id %s", snapID)
id, arrayID, protocol, _, _, err := array.ParseVolumeID(ctx, snapID, s.DefaultArray(), nil)
if err != nil {
log.Error(err)
return []GeneralSnapshot{}, "", nil
}
arr, ok := s.Arrays()[arrayID]
if !ok {
return nil, "", status.Errorf(codes.Internal, "unable to get array with arrayID %s", arrayID)
}
if protocol == "nfs" {
fsSnapshot, getErr := arr.GetClient().GetFsSnapshot(ctx, id)
if apiError, ok := getErr.(gopowerstore.APIError); ok && apiError.NotFound() {
// given snapshot id does not exist, should return empty response
return generalSnapshots, "", nil
}
if getErr != nil {
return nil, "", status.Errorf(codes.Internal, "unable to get filesystem snapshot: %s", getErr.Error())
}
log.Info(fsSnapshot)
fsSnapshot.ID = fsSnapshot.ID + "/" + arrayID + "/" + protocol
generalSnapshots = append(generalSnapshots, FilesystemSnapshot(fsSnapshot))
} else {
blockSnap, getErr := arr.GetClient().GetSnapshot(ctx, id)
if apiError, ok := getErr.(gopowerstore.APIError); ok && apiError.NotFound() {
// given snapshot id does not exist, should return empty response
return generalSnapshots, "", nil
}
if getErr != nil {
return nil, "", status.Errorf(codes.Internal, "unable to get block snapshot: %s", getErr.Error())
}
blockSnap.ID = blockSnap.ID + "/" + arrayID + "/" + protocol
generalSnapshots = append(generalSnapshots, VolumeSnapshot(blockSnap))
}
} else {
log.Infof("Requested snapshot via source id %s", srcID)
// This works VGS on single default array, But for multiple array scenario this default array should be changed to dynamic array
id, arrayID, protocol, _, _, err := array.ParseVolumeID(ctx, srcID, s.DefaultArray(), nil)
if err != nil {
log.Error(err)
return []GeneralSnapshot{}, "", nil
}
arr, ok := s.Arrays()[arrayID]
if !ok {
return nil, "", status.Errorf(codes.Internal, "unable to get array with arrayID %s", arrayID)
}
if protocol == "nfs" {
snaps, err := arr.GetClient().GetFsSnapshotsByVolumeID(ctx, id)
if err != nil {
return nil, "", status.Errorf(codes.Internal, "unable to list filesystem snapshots: %s", err.Error())
}
for _, snap := range snaps {
generalSnapshots = append(generalSnapshots, FilesystemSnapshot(snap))
}
} else {
snaps, err := arr.GetClient().GetSnapshotsByVolumeID(ctx, id)
if err != nil {
return nil, "", status.Errorf(codes.Internal, "unable to list block snapshots: %s", err.Error())
}
for _, snap := range snaps {
generalSnapshots = append(generalSnapshots, VolumeSnapshot(snap))
}
}
}
if startToken > len(generalSnapshots) {
return nil, "", status.Errorf(codes.Aborted, "startingToken=%d > len(generalSnapshots)=%d", startToken, len(generalSnapshots))
}
// Discern the number of remaining entries.
rem := len(generalSnapshots) - startToken
// If maxEntries is 0 or greater than the number of remaining entries then
// set max entries to the number of remaining entries.
if maxEntries == 0 || maxEntries > rem {
maxEntries = rem
}
// We can't really return more per page
if maxEntries > 300 {
maxEntries = 300
}
// Compute the next starting point; if at end reset
nextToken := startToken + maxEntries
nextTokenStr := ""
if nextToken < (startToken + rem) {
nextTokenStr = fmt.Sprintf("%d", nextToken)
}
return generalSnapshots[startToken : startToken+maxEntries], nextTokenStr, nil
}
/*
*
* Copyright © 2022-2023 Dell Inc. or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package controller provides CSI specification compatible controller service.
package controller
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
log "github.com/sirupsen/logrus"
"github.com/dell/csi-powerstore/v2/pkg/common"
)
// QueryArrayStatus make API call to the specified url to retrieve connection status
func (s *Service) QueryArrayStatus(ctx context.Context, url string) (bool, error) {
defer func() {
if err := recover(); err != nil {
log.Println("panic occurred in queryStatus:", err)
}
}()
client := http.Client{
Timeout: common.Timeout,
}
resp, err := client.Get(url)
log.Debugf("Received response %+v for url %s", resp, url)
if err != nil {
log.Errorf("failed to call API %s due to %s ", url, err.Error())
return false, err
}
defer resp.Body.Close() // #nosec G307
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
log.Errorf("failed to read API response due to %s ", err.Error())
return false, err
}
if resp.StatusCode != 200 {
log.Errorf("Found unexpected response from the server while fetching array status %d ", resp.StatusCode)
return false, fmt.Errorf("unexpected response from the server")
}
var statusResponse common.ArrayConnectivityStatus
err = json.Unmarshal(bodyBytes, &statusResponse)
if err != nil {
log.Errorf("unable to unmarshal and determine connectivity due to %s ", err)
return false, err
}
log.Infof("API Response received is %+v\n", statusResponse)
// responseObject has last success and last attempt timestamp in Unix format
timeDiff := statusResponse.LastAttempt - statusResponse.LastSuccess
tolerance := common.SetPollingFrequency(ctx)
currTime := time.Now().Unix()
// checking if the status response is stale and connectivity test is still running
// since nodeProbe is run at frequency tolerance/2, ideally below check should never be true
if (currTime - statusResponse.LastAttempt) > tolerance*2 {
log.Errorf("seems like connectivity test is not being run, current time is %d and last run was at %d", currTime, statusResponse.LastAttempt)
// considering connectivity is broken
return false, nil
}
log.Debugf("last connectivity was %d sec back, tolerance is %d sec", timeDiff, tolerance)
// give 2s leeway for tolerance check
if timeDiff <= tolerance+2 {
return true, nil
}
return false, nil
}
/*
*
* Copyright © 2021-2023 Dell Inc. or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package controller
import (
"context"
"net/http"
"strconv"
"github.com/dell/csi-powerstore/v2/pkg/common"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/dell/gopowerstore"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
const (
// ReservedSize number of bytes reserved by creation of FS
ReservedSize = 1610612736
)
// Extra metadata field names for propagating to gopowerstore and beyond.
const (
// These are available when enabling --extra-create-metadata for the external-provisioner.
CSIPersistentVolumeName = "csi.storage.k8s.io/pv/name"
CSIPersistentVolumeClaimName = "csi.storage.k8s.io/pvc/name"
CSIPersistentVolumeClaimNamespace = "csi.storage.k8s.io/pvc/namespace"
// These map to the above fields in the form of HTTP header names.
HeaderPersistentVolumeName = "x-csi-pv-name"
HeaderPersistentVolumeClaimName = "x-csi-pv-claimname"
HeaderPersistentVolumeClaimNamespace = "x-csi-pv-namespace"
)
// VolumeCreator allows to call Create and similar operations used in CreateVolume call
type VolumeCreator interface {
// CheckSize validates that size is correct and returns size in bytes
CheckSize(ctx context.Context, cr *csi.CapacityRange, isAutoRoundOffFsSizeEnabled bool) (int64, error)
// CheckName validates volume name
CheckName(ctx context.Context, name string) error
// CheckIfAlreadyExists queries storage array if given volume already exists
CheckIfAlreadyExists(ctx context.Context, name string,
sizeInBytes int64, client gopowerstore.Client) (*csi.Volume, error)
// Create creates new volume
Create(ctx context.Context, req *csi.CreateVolumeRequest, sizeInBytes int64,
client gopowerstore.Client) (gopowerstore.CreateResponse, error)
// Create volume from snapshot
CreateVolumeFromSnapshot(ctx context.Context, snapshotSource *csi.VolumeContentSource_SnapshotSource,
volumeName string, sizeInBytes int64, parameters map[string]string, client gopowerstore.Client) (*csi.Volume, error)
// Create a volume from another volume
Clone(ctx context.Context, volumeSource *csi.VolumeContentSource_VolumeSource, volumeName string, sizeInBytes int64, parameters map[string]string, client gopowerstore.Client) (*csi.Volume, error)
}
// SCSICreator implementation of VolumeCreator for SCSI based (FC, iSCSI) volumes
type SCSICreator struct {
vg *gopowerstore.VolumeGroup
}
func setMetaData(reqParams map[string]string, createParams interface{}) {
// If the VolumeParam has a MetaData method, set the values accordingly.
if t, ok := createParams.(interface {
MetaData() http.Header
}); ok {
t.MetaData().Set(HeaderPersistentVolumeName, reqParams[CSIPersistentVolumeName])
t.MetaData().Set(HeaderPersistentVolumeClaimName, reqParams[CSIPersistentVolumeClaimName])
t.MetaData().Set(HeaderPersistentVolumeClaimNamespace, reqParams[CSIPersistentVolumeClaimNamespace])
} else {
log.Printf("warning: %T: no MetaData method exists, consider updating gopowerstore library.", createParams)
}
}
func setVolumeCreateAttributes(reqParams map[string]string, createParams *gopowerstore.VolumeCreate) {
if applianceID, ok := reqParams[common.KeyApplianceID]; ok {
createParams.ApplianceID = applianceID
}
if description, ok := reqParams[common.KeyVolumeDescription]; ok {
createParams.Description = description
}
if protectionPolicyID, ok := reqParams[common.KeyProtectionPolicyID]; ok {
createParams.ProtectionPolicyID = protectionPolicyID
}
if performancePolicyID, ok := reqParams[common.KeyPerformancePolicyID]; ok {
createParams.PerformancePolicyID = performancePolicyID
}
if appType, ok := reqParams[common.KeyAppType]; ok {
createParams.AppType = gopowerstore.AppTypeEnum(appType)
if appTypeOther, ok := reqParams[common.KeyAppTypeOther]; ok {
createParams.AppTypeOther = appTypeOther
}
}
}
func validateHostIOSize(hostIOSize string) string {
switch hostIOSize {
case gopowerstore.VMware8K,
gopowerstore.VMware16K,
gopowerstore.VMware32K,
gopowerstore.VMware64K:
return hostIOSize
}
return gopowerstore.VMware8K
}
func setFLRAttributes(reqParams map[string]string, createParams *gopowerstore.FsCreate) {
flrMode, flrModeFound := reqParams[common.KeyFlrCreateMode]
flrDefaultRetention, flrDefaultRetentionFound := reqParams[common.KeyFlrDefaultRetention]
flrMinimumRetention, flrMinimumRetentionFound := reqParams[common.KeyFlrMinRetention]
flrMaximumRetention, flrMaximumRetentionFound := reqParams[common.KeyFlrMaxRetention]
if flrModeFound ||
flrDefaultRetentionFound ||
flrMaximumRetentionFound ||
flrMinimumRetentionFound {
flrCreate := new(gopowerstore.FLRCreate)
if flrModeFound {
flrCreate.Mode = flrMode
}
if flrDefaultRetentionFound {
flrCreate.DefaultRetention = flrDefaultRetention
}
if flrMinimumRetentionFound {
flrCreate.MinimumRetention = flrMinimumRetention
}
if flrMaximumRetentionFound {
flrCreate.MaximumRetention = flrMaximumRetention
}
createParams.FlrCreate = *flrCreate
}
}
func setNFSCreateAttributes(reqParams map[string]string, createParams *gopowerstore.FsCreate) {
if description, ok := reqParams[common.KeyVolumeDescription]; ok {
createParams.Description = description
}
if configType, ok := reqParams[common.KeyConfigType]; ok {
createParams.ConfigType = configType
}
if accessPolicy, ok := reqParams[common.KeyAccessPolicy]; ok {
createParams.AccessPolicy = accessPolicy
}
if lockingPolicy, ok := reqParams[common.KeyLockingPolicy]; ok {
createParams.LockingPolicy = lockingPolicy
}
if folderRenamePolicy, ok := reqParams[common.KeyFolderRenamePolicy]; ok {
createParams.FolderRenamePolicy = folderRenamePolicy
}
if isAsyncMTimeEnabled, ok := reqParams[common.KeyIsAsyncMtimeEnabled]; ok {
if val, err := strconv.ParseBool(isAsyncMTimeEnabled); err == nil {
createParams.IsAsyncMTimeEnabled = val
}
}
if protectionPolicyID, ok := reqParams[common.KeyProtectionPolicyID]; ok {
createParams.ProtectionPolicyID = protectionPolicyID
}
if fileEventsPublishingMode, ok := reqParams[common.KeyFileEventsPublishingMode]; ok {
createParams.FileEventsPublishingMode = fileEventsPublishingMode
}
if hostIOSize, ok := reqParams[common.KeyHostIoSize]; ok {
createParams.HostIOSize = validateHostIOSize(hostIOSize)
}
setFLRAttributes(reqParams, createParams)
}
// CheckSize validates that size is correct and returns size in bytes
func (*SCSICreator) CheckSize(_ context.Context, cr *csi.CapacityRange, _ bool) (int64, error) {
minSize := cr.GetRequiredBytes()
maxSize := cr.GetLimitBytes()
if minSize == 0 {
minSize = MinVolumeSizeBytes
}
if maxSize == 0 {
maxSize = MaxVolumeSizeBytes
}
mod := minSize % VolumeSizeMultiple
if mod > 0 {
minSize = minSize + VolumeSizeMultiple - mod
}
if err := volumeSizeValidation(minSize, maxSize); err != nil {
return 0, err
}
return minSize, nil
}
// CheckName validates volume name
func (*SCSICreator) CheckName(_ context.Context, name string) error {
return volumeNameValidation(name)
}
// CheckIfAlreadyExists queries storage array if Volume with given name exists
func (*SCSICreator) CheckIfAlreadyExists(ctx context.Context, name string, sizeInBytes int64, client gopowerstore.Client) (*csi.Volume, error) {
alreadyExistVolume, err := client.GetVolumeByName(ctx, name)
if err != nil {
return nil, status.Errorf(codes.Internal, "can't find volume '%s': %s", name, err.Error())
}
if alreadyExistVolume.Size < sizeInBytes {
return nil, status.Errorf(codes.AlreadyExists,
"volume '%s' already exists but is incompatible volume size: %d < %d",
name, alreadyExistVolume.Size, sizeInBytes)
}
volumeResponse := getCSIVolume(alreadyExistVolume.ID, alreadyExistVolume.Size)
return volumeResponse, nil
}
// Create creates new block volume on storage array
func (sc *SCSICreator) Create(ctx context.Context, req *csi.CreateVolumeRequest, sizeInBytes int64, client gopowerstore.Client) (gopowerstore.CreateResponse, error) {
name := req.GetName()
metadata := map[string]string{
"k8s_pvol_name": req.Parameters[CSIPersistentVolumeName],
"k8s_claim_name": req.Parameters[CSIPersistentVolumeClaimName],
"k8s_claim_namespace": req.Parameters[CSIPersistentVolumeClaimNamespace],
}
var reqParams *gopowerstore.VolumeCreate
defaultHeaders := client.GetCustomHTTPHeaders()
if defaultHeaders == nil {
defaultHeaders = make(http.Header)
}
customHeaders := defaultHeaders
k8sMetadataSupported := common.IsK8sMetadataSupported(client)
if k8sMetadataSupported &&
metadata["k8s_pvol_name"] != "" &&
metadata["k8s_claim_name"] != "" &&
metadata["k8s_claim_namespace"] != "" {
customHeaders.Add("DELL-VISIBILITY", "internal")
client.SetCustomHTTPHeaders(customHeaders)
reqParams = &gopowerstore.VolumeCreate{Name: &name, Size: &sizeInBytes, Metadata: &metadata}
} else {
reqParams = &gopowerstore.VolumeCreate{Name: &name, Size: &sizeInBytes}
}
if sc.vg != nil {
reqParams.VolumeGroupID = sc.vg.ID
} else if vgID, ok := req.Parameters[common.KeyVolumeGroupID]; ok {
reqParams.VolumeGroupID = vgID
}
setMetaData(req.Parameters, reqParams)
setVolumeCreateAttributes(req.Parameters, reqParams)
resp, err := client.CreateVolume(ctx, reqParams)
// reset custom header
customHeaders.Del("DELL-VISIBILITY")
client.SetCustomHTTPHeaders(customHeaders)
return resp, err
}
// CreateVolumeFromSnapshot create a volume from an existing snapshot.
// The snapshotSource gives the SnapshotId which is the volume to be replicated.
func (*SCSICreator) CreateVolumeFromSnapshot(ctx context.Context, snapshotSource *csi.VolumeContentSource_SnapshotSource,
volumeName string, sizeInBytes int64, parameters map[string]string, client gopowerstore.Client,
) (*csi.Volume, error) {
var volumeResponse *csi.Volume
// Lookup the volume source volume.
sourceVol, err := client.GetVolume(ctx, snapshotSource.SnapshotId)
if err != nil {
return nil, status.Errorf(codes.NotFound, "volume snapshot not found: %s", snapshotSource.SnapshotId)
}
if sourceVol.Size != sizeInBytes {
return nil, status.Errorf(codes.InvalidArgument,
"snapshot %s has incompatible size %d bytes with requested %d bytes",
snapshotSource.SnapshotId, sourceVol.Size, sizeInBytes)
}
createParams := gopowerstore.VolumeClone{
Name: &volumeName,
Description: nil,
}
setMetaData(parameters, &createParams)
volume, err := client.CreateVolumeFromSnapshot(ctx, &createParams, snapshotSource.SnapshotId)
if err != nil {
return nil, status.Errorf(codes.Internal, "can't create volume: %s", snapshotSource.SnapshotId)
}
volumeResponse = getCSIVolumeFromSnapshot(volume.ID, snapshotSource, sizeInBytes)
volumeResponse.VolumeContext = parameters
return volumeResponse, nil
}
// Clone creates a clone of a Volume
func (*SCSICreator) Clone(ctx context.Context, volumeSource *csi.VolumeContentSource_VolumeSource,
volumeName string, sizeInBytes int64, parameters map[string]string, client gopowerstore.Client,
) (*csi.Volume, error) {
var volumeResponse *csi.Volume
// Lookup the volume source volume.
sourceVol, err := client.GetVolume(ctx, volumeSource.VolumeId)
if err != nil {
return nil, status.Errorf(codes.NotFound, "volume not found: %s", volumeSource.VolumeId)
}
if sourceVol.Size != sizeInBytes {
return nil, status.Errorf(codes.InvalidArgument,
"volume %s has incompatible size %d bytes with requested %d bytes",
volumeSource.VolumeId, sourceVol.Size, sizeInBytes)
}
createParams := gopowerstore.VolumeClone{
Name: &volumeName,
Description: nil,
}
setMetaData(parameters, &createParams)
volume, err := client.CloneVolume(ctx, &createParams, volumeSource.VolumeId)
if err != nil {
return nil, status.Errorf(codes.Internal, "can't clone volume: %s", err.Error())
}
volumeResponse = &csi.Volume{
CapacityBytes: sizeInBytes,
VolumeId: volume.ID,
VolumeContext: parameters,
ContentSource: &csi.VolumeContentSource{
Type: &csi.VolumeContentSource_Volume{
Volume: volumeSource,
},
},
}
return volumeResponse, nil
}
func getCSIVolumeFromSnapshot(VolumeID string, snapshotSource *csi.VolumeContentSource_SnapshotSource, size int64) *csi.Volume {
volume := &csi.Volume{
CapacityBytes: size,
VolumeId: VolumeID,
ContentSource: &csi.VolumeContentSource{
Type: &csi.VolumeContentSource_Snapshot{
Snapshot: snapshotSource,
},
},
}
return volume
}
func getCSIVolumeFromClone(VolumeID string, volumeSource *csi.VolumeContentSource_VolumeSource, size int64) *csi.Volume {
volume := &csi.Volume{
CapacityBytes: size,
VolumeId: VolumeID,
ContentSource: &csi.VolumeContentSource{
Type: &csi.VolumeContentSource_Volume{
Volume: volumeSource,
},
},
}
return volume
}
// NfsCreator implementation of VolumeCreator for NFS volumes
type NfsCreator struct {
nasName string
}
// CheckSize validates that size is correct and returns size in bytes
func (*NfsCreator) CheckSize(_ context.Context, cr *csi.CapacityRange, isAutoRoundOffFsSizeEnabled bool) (int64, error) {
minSize := cr.GetRequiredBytes()
maxSize := cr.GetLimitBytes()
if minSize == 0 {
minSize = MinVolumeSizeBytes
}
if maxSize == 0 {
maxSize = MaxVolumeSizeBytes
}
mod := minSize % VolumeSizeMultiple
if mod > 0 {
minSize = minSize + VolumeSizeMultiple - mod
}
// TODO: This roundoff logic to be removed once platform supports minimum filesystem size
if isAutoRoundOffFsSizeEnabled && minSize < MinFilesystemSizeBytes {
log.Warn("Auto round off Filesystem size has been enabled! Rounding off PVC size to 3Gi.")
return MinFilesystemSizeBytes, nil
}
if err := volumeSizeValidation(minSize, maxSize); err != nil {
return 0, err
}
return minSize, nil
}
// CheckName validates volume name
func (*NfsCreator) CheckName(_ context.Context, name string) error {
return volumeNameValidation(name)
}
// CheckIfAlreadyExists queries storage array if FileSystem with given name exists
func (*NfsCreator) CheckIfAlreadyExists(ctx context.Context, name string, sizeInBytes int64, client gopowerstore.Client) (*csi.Volume, error) {
alreadyExistVolume, err := client.GetFSByName(ctx, name)
if err != nil {
return nil, status.Errorf(codes.Internal, "can't find filesystem '%s': %s", name, err.Error())
}
if alreadyExistVolume.SizeTotal < sizeInBytes {
return nil, status.Errorf(codes.AlreadyExists,
"filesystem '%s' already exists but is incompatible volume size: %d < %d",
name, alreadyExistVolume.SizeTotal, sizeInBytes)
}
volumeResponse := getCSIVolume(alreadyExistVolume.ID, alreadyExistVolume.SizeTotal)
return volumeResponse, nil
}
// Create creates new FileSystem on storage array
func (c *NfsCreator) Create(ctx context.Context, req *csi.CreateVolumeRequest, sizeInBytes int64, client gopowerstore.Client) (gopowerstore.CreateResponse, error) {
nas, err := client.GetNASByName(ctx, c.nasName)
if err != nil {
return gopowerstore.CreateResponse{}, err
}
reqParams := &gopowerstore.FsCreate{
Name: req.GetName(),
NASServerID: nas.ID,
Size: sizeInBytes + ReservedSize,
}
setMetaData(req.Parameters, reqParams)
setNFSCreateAttributes(req.Parameters, reqParams)
return client.CreateFS(ctx, reqParams)
}
// CreateVolumeFromSnapshot create a FileSystem from an existing FileSystem snapshot.
func (*NfsCreator) CreateVolumeFromSnapshot(ctx context.Context, snapshotSource *csi.VolumeContentSource_SnapshotSource,
volumeName string, sizeInBytes int64, parameters map[string]string, client gopowerstore.Client,
) (*csi.Volume, error) {
var volumeResponse *csi.Volume
// Lookup the volume source volume.
sourceVol, err := client.GetFS(ctx, snapshotSource.SnapshotId)
if err != nil {
return nil, status.Errorf(codes.NotFound, "fs snapshot not found: %s", snapshotSource.SnapshotId)
}
if sourceVol.SizeTotal != sizeInBytes+ReservedSize {
return nil, status.Errorf(codes.InvalidArgument,
"snapshot %s has incompatible size %d bytes (additional %d bytes) with requested %d bytes",
snapshotSource.SnapshotId, sourceVol.SizeTotal, ReservedSize, sizeInBytes)
}
createParams := gopowerstore.FsClone{
Name: &volumeName,
Description: nil,
}
setMetaData(parameters, &createParams)
volume, err := client.CreateFsFromSnapshot(ctx, &createParams, snapshotSource.SnapshotId)
if err != nil {
return nil, status.Errorf(codes.Internal, "can't create fs: %s", snapshotSource.SnapshotId)
}
volumeResponse = getCSIVolumeFromSnapshot(volume.ID, snapshotSource, sizeInBytes)
volumeResponse.VolumeContext = parameters
return volumeResponse, nil
}
// Clone creates a clone of a FileSystem
func (*NfsCreator) Clone(ctx context.Context, volumeSource *csi.VolumeContentSource_VolumeSource,
volumeName string, sizeInBytes int64, parameters map[string]string, client gopowerstore.Client,
) (*csi.Volume, error) {
var volumeResponse *csi.Volume
// Lookup the volume source volume.
sourceVol, err := client.GetFS(ctx, volumeSource.VolumeId)
if err != nil {
return nil, status.Errorf(codes.NotFound, "fs not found: %s", volumeSource.VolumeId)
}
if sourceVol.SizeTotal != sizeInBytes+ReservedSize {
return nil, status.Errorf(codes.InvalidArgument,
"fs %s has incompatible size %d bytes (additional %d bytes) with requested %d bytes",
volumeSource.VolumeId, sourceVol.SizeTotal, ReservedSize, sizeInBytes)
}
createParams := gopowerstore.FsClone{
Name: &volumeName,
Description: nil,
}
setMetaData(parameters, &createParams)
volume, err := client.CloneFS(ctx, &createParams, volumeSource.VolumeId)
if err != nil {
return nil, status.Errorf(codes.Internal, "can't clone fs: %s", err.Error())
}
volumeResponse = &csi.Volume{
CapacityBytes: sizeInBytes,
VolumeId: volume.ID,
VolumeContext: parameters,
ContentSource: &csi.VolumeContentSource{
Type: &csi.VolumeContentSource_Volume{
Volume: volumeSource,
},
},
}
return volumeResponse, nil
}
/*
*
* Copyright © 2022-2024 Dell Inc. or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package controller
import (
"context"
"fmt"
"strings"
"time"
"github.com/dell/csi-powerstore/v2/pkg/array"
"github.com/dell/csi-powerstore/v2/pkg/common"
podmon "github.com/dell/dell-csi-extensions/podmon"
vgsext "github.com/dell/dell-csi-extensions/volumeGroupSnapshot"
"github.com/dell/gopowerstore"
"github.com/go-openapi/strfmt"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// StateReady resembles ready state
const StateReady = "Ready"
// CreateVolumeGroupSnapshot creates volume group snapshot
func (s *Service) CreateVolumeGroupSnapshot(ctx context.Context, request *vgsext.CreateVolumeGroupSnapshotRequest) (*vgsext.CreateVolumeGroupSnapshotResponse, error) {
log.Infof("CreateVolumeGroupSnapshot called with req: %v", request)
err := validateCreateVGSreq(request)
if err != nil {
log.Errorf("Error from CreateVolumeGroupSnapshot: %v ", err)
return nil, err
}
var reqParams gopowerstore.VolumeGroupSnapshotCreate
reqParams.Name = request.GetName()
reqParams.Description = request.GetDescription()
parsedVolHandle := strings.Split(request.SourceVolumeIDs[0], "/")
var arr string
if len(parsedVolHandle) >= 2 {
arr = parsedVolHandle[1]
}
var sourceVols []string
var volGroup gopowerstore.VolumeGroup
var snapsList []*vgsext.Snapshot
var int64CreationTime int64
var existingVgID string
for _, v := range request.GetSourceVolumeIDs() {
sourceVols = append(sourceVols, strings.Split(v, "/")[0])
}
// To create volume group
vgParams := gopowerstore.VolumeGroupCreate{
Name: request.GetName(),
Description: request.GetDescription(),
VolumeIDs: sourceVols,
}
gotVg, err := s.Arrays()[arr].GetClient().GetVolumeGroupByName(ctx, request.GetName())
if err != nil {
if apiError, ok := err.(gopowerstore.APIError); !(ok && apiError.NotFound()) {
return nil, status.Errorf(codes.Internal, "Error getting volume group by name: %s", err.Error())
}
}
// Check whether volume group already exists, if yes proceed to create a snapshot else create a new volume group
if gotVg.ID != "" {
// taking the existing volume group to re-create
existingVgID = gotVg.ID
// add members to existing volume group before taking snapshot
_, err := s.Arrays()[arr].GetClient().AddMembersToVolumeGroup(ctx, &gopowerstore.VolumeGroupMembers{VolumeIDs: sourceVols}, existingVgID)
if err != nil {
if apiError, ok := err.(gopowerstore.APIError); !(ok && apiError.VolumeNameIsAlreadyUse()) {
return nil, status.Errorf(codes.Internal, "Error adding volume group members: %s", err.Error())
}
}
} else {
r, err := s.Arrays()[arr].GetClient().GetVolumeGroupsByVolumeID(ctx, vgParams.VolumeIDs[0])
if err != nil {
if apiError, ok := err.(gopowerstore.APIError); !(ok && apiError.NotFound()) {
return nil, status.Errorf(codes.Internal, "Error getting volume group by volume ID: %s", err.Error())
}
}
if len(r.VolumeGroup) == 0 {
resp, err := s.Arrays()[arr].GetClient().CreateVolumeGroup(ctx, &vgParams)
if err != nil {
if apiError, ok := err.(gopowerstore.APIError); !(ok && apiError.VolumeNameIsAlreadyUse()) {
return nil, status.Errorf(codes.Internal, "Error creating volume group: %s", err.Error())
}
}
if resp.ID != "" {
existingVgID = resp.ID
}
} else {
existingVgID = r.VolumeGroup[0].ID
}
}
if existingVgID != "" {
resp, err := s.Arrays()[arr].GetClient().CreateVolumeGroupSnapshot(ctx, existingVgID, &reqParams)
if err != nil {
if apiError, ok := err.(gopowerstore.APIError); !(ok && apiError.VolumeNameIsAlreadyUse()) {
return nil, status.Errorf(codes.Internal, "Error creating volume group snapshot: %s", err.Error())
}
}
volGroup, err = s.Arrays()[arr].GetClient().GetVolumeGroup(ctx, resp.ID)
if err != nil {
if apiError, ok := err.(gopowerstore.APIError); !(ok && apiError.VolumeNameIsAlreadyUse()) {
return nil, status.Errorf(codes.Internal, "Error getting volume group snapshot: %s", err.Error())
}
}
etime, _ := time.Parse(time.RFC3339, volGroup.CreationTimeStamp)
int64CreationTime = etime.Unix() * 1000000000 // we need to convert to nano seconds
for _, v := range volGroup.Volumes {
var snapState bool
if v.State == StateReady {
snapState = true
}
volID := strings.Split(request.SourceVolumeIDs[0], "/")
if len(volID) >= 3 {
snapsList = append(snapsList, &vgsext.Snapshot{
Name: v.Name,
SnapId: v.ID + "/" + arr + "/" + volID[2],
ReadyToUse: snapState,
CapacityBytes: v.Size,
SourceId: v.ProtectionData.SourceID + "/" + arr + "/" + volID[2],
CreationTime: int64CreationTime,
})
}
}
}
return &vgsext.CreateVolumeGroupSnapshotResponse{
SnapshotGroupID: volGroup.ID,
Snapshots: snapsList,
CreationTime: int64CreationTime,
}, nil
}
// validate if request has VGS name, and VGS name must be less than 28 chars
func validateCreateVGSreq(request *vgsext.CreateVolumeGroupSnapshotRequest) error {
if request.Name == "" {
err := status.Error(codes.InvalidArgument, "CreateVolumeGroupSnapshotRequest needs Name to be set")
log.Errorf("Error from validateCreateVGSreq: %v ", err)
return err
}
// name must be less than 28 chars, because we name snapshots with -<index>, and index can at most be 3 chars
if len(request.Name) > 27 {
err := status.Errorf(codes.InvalidArgument, "Requested name %s longer than 27 character max", request.Name)
log.Errorf("Error from validateCreateVGSreq: %v ", err)
return err
}
if len(request.SourceVolumeIDs) == 0 {
err := status.Errorf(codes.InvalidArgument, "Source volumes are not present")
log.Errorf("Error from validateCreateVGSreq: %v ", err)
return err
}
return nil
}
// ValidateVolumeHostConnectivity menthod will be called by podmon sidecars to check host connectivity with array
func (s *Service) ValidateVolumeHostConnectivity(ctx context.Context, req *podmon.ValidateVolumeHostConnectivityRequest) (*podmon.ValidateVolumeHostConnectivityResponse, error) {
// ctx, log, _ := GetRunIDLog(ctx)
log.Infof("ValidateVolumeHostConnectivity called %+v", req)
rep := &podmon.ValidateVolumeHostConnectivityResponse{
Messages: make([]string, 0),
}
if (len(req.GetVolumeIds()) == 0 || len(req.GetArrayId()) == 0) && len(req.GetNodeId()) == 0 {
// This is a nop call just testing the interface is present
rep.Messages = append(rep.Messages, "ValidateVolumeHostConnectivity is implemented")
return rep, nil
}
if req.GetNodeId() == "" {
return nil, fmt.Errorf("the NodeID is a required field")
}
// create the map of all the array with array's GloabalID as key
globalIDs := make(map[string]bool)
globalID := req.GetArrayId()
if globalID == "" {
if len(req.GetVolumeIds()) == 0 {
log.Info("neither globalId nor volumeID is present in request")
globalIDs[s.DefaultArray().GlobalID] = true
}
// for loop req.GetVolumeIds()
for _, volID := range req.GetVolumeIds() {
_, globalID, _, _, _, err := array.ParseVolumeID(ctx, volID, s.DefaultArray(), nil)
if err != nil || globalID == "" {
log.Errorf("unable to retrieve array's globalID after parsing volumeID")
globalIDs[s.DefaultArray().GlobalID] = true
} else {
globalIDs[globalID] = true
}
}
} else {
globalIDs[globalID] = true
}
// Go through each of the globalIDs
for globalID := range globalIDs {
// First - check if the array is visible from the node
err := s.checkIfNodeIsConnected(ctx, globalID, req.GetNodeId(), rep)
if err != nil {
return rep, err
}
// Check for IOinProgress only when volumes IDs are present in the request as the field is required only in the latter case also to reduce number of calls to the API making it efficient
if len(req.GetVolumeIds()) > 0 {
// Get array config
for _, volID := range req.GetVolumeIds() {
id, globalIDForVol, protocol, _, _, _ := array.ParseVolumeID(ctx, volID, s.DefaultArray(), nil)
if globalIDForVol != globalID {
log.Errorf("Recived globalId from podman is %s and retrieved from array is %s ", globalID, globalIDForVol)
return nil, fmt.Errorf("invalid globalId %s is provided", globalID)
}
arraysConfig, err := s.GetOneArray(globalID)
if err != nil || arraysConfig == nil {
log.Error("Failed to get array config with error ", err.Error())
return nil, err
}
// check if any IO is inProgress for the current globalID/array
err = s.IsIOInProgress(ctx, id, arraysConfig, protocol)
if err == nil {
rep.IosInProgress = true
return rep, nil
}
}
}
}
log.Infof("ValidateVolumeHostConnectivity reply %+v", rep)
return rep, nil
}
// checkIfNodeIsConnected looks at the 'nodeId' to determine if there is connectivity to the 'arrayId' array.
// The 'rep' object will be filled with the results of the check.
func (s *Service) checkIfNodeIsConnected(ctx context.Context, arrayID string, nodeID string, rep *podmon.ValidateVolumeHostConnectivityResponse) error {
log.Infof("Checking if array %s is connected to node %s", arrayID, nodeID)
var message string
rep.Connected = false
nodeIP := common.GetIPListFromString(nodeID)
if len(nodeIP) == 0 {
log.Errorf("failed to parse node ID '%s'", nodeID)
return fmt.Errorf("failed to parse node ID")
}
ip := nodeIP[len(nodeIP)-1]
// form url to call array on node
url := "http://" + ip + common.APIPort + common.ArrayStatus + "/" + arrayID
connected, err := s.QueryArrayStatus(ctx, url)
if err != nil {
message = fmt.Sprintf("connectivity unknown for array %s to node %s due to %s", arrayID, nodeID, err)
log.Error(message)
rep.Messages = append(rep.Messages, message)
log.Errorf("%s", err.Error())
}
if connected {
rep.Connected = true
message = fmt.Sprintf("array %s is connected to node %s", arrayID, nodeID)
} else {
message = fmt.Sprintf("array %s is not connected to node %s", arrayID, nodeID)
}
log.Info(message)
rep.Messages = append(rep.Messages, message)
return nil
}
// IsIOInProgress function check the IO operation status on array
func (s *Service) IsIOInProgress(ctx context.Context, volID string, arrayConfig *array.PowerStoreArray, protocol string) (err error) {
// Call PerformanceMetricsByVolume or PerformanceMetricsByFileSystem in gopowerstore based on the volume type
if protocol == "scsi" {
resp, err := arrayConfig.Client.PerformanceMetricsByVolume(ctx, volID, gopowerstore.TwentySec)
if err != nil {
log.Errorf("Error %v while checking IsIOInProgress for array having globalId %s for volumeId %s", err.Error(), arrayConfig.GlobalID, volID)
return fmt.Errorf("error %v while while checking IsIOInProgress", err.Error())
}
// check last four entries status recieved in the response
for i := len(resp) - 1; i >= (len(resp)-4) && i >= 0; i-- {
if resp[i].TotalIops > 0.0 && checkIfEntryIsLatest(resp[i].CommonMetricsFields.Timestamp) {
return nil
}
}
return fmt.Errorf("no IOInProgress")
}
// nfs volume type logic
resp, err := arrayConfig.Client.PerformanceMetricsByFileSystem(ctx, volID, gopowerstore.TwentySec)
if err != nil {
log.Errorf("Error %v while checking IsIOInProgress for array having globalId %s for volumeId %s", err.Error(), arrayConfig.GlobalID, volID)
return fmt.Errorf("error %v while while checking IsIOInProgress", err.Error())
}
// check last four entries status recieved in the response
for i := len(resp) - 1; i >= len(resp)-4 && i >= 0; i-- {
if resp[i].TotalIops > 0.0 && checkIfEntryIsLatest(resp[i].CommonMetricsFields.Timestamp) {
return nil
}
}
return fmt.Errorf("no IOInProgress")
}
func checkIfEntryIsLatest(timestamp strfmt.DateTime) bool {
RFC3339MillisNoColon := "2006-01-02T15:04:05Z"
stringTime := timestamp.String()
timeFromResponse, err := time.Parse(RFC3339MillisNoColon, stringTime)
if err != nil {
log.Errorf("error in parsing the time recieved in the response %v", err)
return false
}
log.Debugf("timestamp recieved from the response body is %v", timeFromResponse)
currentTime := time.Now().UTC()
log.Debugf("current time %v", currentTime)
if currentTime.Sub(timeFromResponse).Seconds() < 60 {
log.Debug("found a fresh metric")
return true
}
return false
}
/*
*
* Copyright © 2021-2023 Dell Inc. or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package controller
import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/dell/csi-powerstore/v2/pkg/common"
"github.com/dell/gopowerstore"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// VolumePublisher allows to publish a volume
type VolumePublisher interface {
// CheckIfVolumeExists queries storage array if given volume already exists
CheckIfVolumeExists(ctx context.Context, client gopowerstore.Client, volID string) error
// Publish does the steps necessary for volume to be available on the node
Publish(ctx context.Context, req *csi.ControllerPublishVolumeRequest, client gopowerstore.Client,
kubeNodeID string, volumeID string) (*csi.ControllerPublishVolumeResponse, error)
}
// SCSIPublisher implementation of VolumePublisher for SCSI based (FC, iSCSI) volumes
type SCSIPublisher struct{}
// Publish publishes Volume by attaching it to the host
func (s *SCSIPublisher) Publish(ctx context.Context, req *csi.ControllerPublishVolumeRequest, client gopowerstore.Client,
kubeNodeID string, volumeID string,
) (*csi.ControllerPublishVolumeResponse, error) {
volume, err := client.GetVolume(ctx, volumeID)
if err != nil {
if apiError, ok := err.(gopowerstore.APIError); ok && apiError.NotFound() {
return nil, status.Errorf(codes.NotFound, "volume with ID '%s' not found", volumeID)
}
return nil, status.Errorf(codes.Internal, "failure checking volume status for volume publishing: %s", err.Error())
}
publishContext := make(map[string]string)
var node gopowerstore.Host
node, err = client.GetHostByName(ctx, kubeNodeID)
if err != nil {
if apiError, ok := err.(gopowerstore.APIError); ok && apiError.HostIsNotExist() {
// We need additional check here since we can just have host without ip in it
ipList := common.GetIPListFromString(kubeNodeID)
if ipList == nil || len(ipList) == 0 {
return nil, status.Errorf(codes.NotFound, "can't find IP in node ID")
}
ip := ipList[len(ipList)-1]
nodeID := kubeNodeID[:len(kubeNodeID)-len(ip)-1]
node, err = client.GetHostByName(ctx, nodeID)
if err != nil {
return nil, status.Errorf(codes.NotFound, "host with k8s node ID '%s' not found", kubeNodeID)
}
} else {
return nil, status.Errorf(codes.Internal, "failure checking host '%s' status for volume publishing: %s",
kubeNodeID, err.Error())
}
}
mapping, err := client.GetHostVolumeMappingByVolumeID(ctx, volume.ID)
if err != nil {
return nil, status.Errorf(codes.Internal,
"failed to get mapping for volume with ID '%s': %s", volume.ID, err.Error())
}
err = s.addTargetsInfoToPublishContext(publishContext, volume.ApplianceID, client)
if err != nil {
return nil, status.Errorf(codes.Internal, "could not get scsi Targets: %s", err.Error())
}
mappingCount := len(mapping)
// Check if the volume is already attached to some host
for _, m := range mapping {
if m.HostID == node.ID {
log.Debug("Volume already mapped")
s.addLUNIDToPublishContext(publishContext, m, volume)
return &csi.ControllerPublishVolumeResponse{
PublishContext: publishContext,
}, nil
}
}
if mappingCount != 0 {
switch req.VolumeCapability.AccessMode.Mode {
case csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY,
csi.VolumeCapability_AccessMode_SINGLE_NODE_SINGLE_WRITER,
csi.VolumeCapability_AccessMode_SINGLE_NODE_MULTI_WRITER:
log.Error(fmt.Sprintf(
"ControllerPublishVolume: Volume present in a different lun mapping - '%s'",
mapping[0].HostID))
return nil, status.Errorf(
codes.FailedPrecondition,
"volume already present in a different lun mapping on node '%s'",
mapping[0].HostID)
}
}
// Attach volume to host
log.Debugf("Attach volume %s to host %s", volume.ID, node.ID)
params := gopowerstore.HostVolumeAttach{VolumeID: &volume.ID}
_, err = client.AttachVolumeToHost(ctx, node.ID, ¶ms)
if err != nil {
return nil, status.Errorf(codes.Internal,
"failed to attach volume with ID '%s' to host with ID '%s': %s", volume.ID, node.ID, err.Error())
}
mapping, err = client.GetHostVolumeMappingByVolumeID(ctx, volume.ID)
if err != nil {
return nil, status.Errorf(codes.Internal,
"failed to get mapping for volume with ID '%s' after attaching: %s", volume.ID, err.Error())
}
for _, m := range mapping {
if m.HostID == node.ID {
s.addLUNIDToPublishContext(publishContext, m, volume)
return &csi.ControllerPublishVolumeResponse{PublishContext: publishContext}, nil
}
}
return nil, status.Errorf(codes.Internal,
"failed to find mapping of volume with ID '%s' to host '%s'", volume.ID, node.ID)
}
// CheckIfVolumeExists queries storage array if Volume with given name exists
func (s *SCSIPublisher) CheckIfVolumeExists(ctx context.Context, client gopowerstore.Client, volID string) error {
_, err := client.GetVolume(ctx, volID)
if err != nil {
if apiError, ok := err.(gopowerstore.APIError); ok && apiError.NotFound() {
return status.Errorf(codes.NotFound, "volume with ID '%s' not found", volID)
}
return status.Errorf(codes.Internal, "failure checking volume status for volume publishing: %s", err.Error())
}
return nil
}
func (s *SCSIPublisher) addLUNIDToPublishContext(
publishContext map[string]string,
mapping gopowerstore.HostVolumeMapping,
volume gopowerstore.Volume,
) {
publishContext[common.PublishContextDeviceWWN] = strings.TrimPrefix(volume.Wwn, common.WWNPrefix)
publishContext[common.PublishContextLUNAddress] = strconv.FormatInt(mapping.LogicalUnitNumber, 10)
}
func (s *SCSIPublisher) addTargetsInfoToPublishContext(
publishContext map[string]string, volumeApplianceID string, client gopowerstore.Client,
) error {
iscsiTargetsInfo, err := common.GetISCSITargetsInfoFromStorage(client, volumeApplianceID)
if err != nil {
log.Error("error unable to get iSCSI targets from array", err)
}
for i, t := range iscsiTargetsInfo {
publishContext[fmt.Sprintf("%s%d", common.PublishContextISCSIPortalsPrefix, i)] = t.Portal
publishContext[fmt.Sprintf("%s%d", common.PublishContextISCSITargetsPrefix, i)] = t.Target
}
fcTargetsInfo, err := common.GetFCTargetsInfoFromStorage(client, volumeApplianceID)
if err != nil {
log.Error("error unable to get FC targets from array", err)
}
for i, t := range fcTargetsInfo {
publishContext[fmt.Sprintf("%s%d", common.PublishContextFCWWPNPrefix, i)] = t.WWPN
}
nvmefcTargetInfo, err := common.GetNVMEFCTargetInfoFromStorage(client, volumeApplianceID)
if err != nil {
log.Error("error unable to get NVMeFC targets from array", err)
}
for i, t := range nvmefcTargetInfo {
publishContext[fmt.Sprintf("%s%d", common.PublishContextNVMEFCPortalsPrefix, i)] = t.Portal
publishContext[fmt.Sprintf("%s%d", common.PublishContextNVMEFCTargetsPrefix, i)] = t.Target
}
nvmetcpTargetInfo, err := common.GetNVMETCPTargetsInfoFromStorage(client, volumeApplianceID)
if err != nil {
log.Error("error unable to get NVMeTCP targets from array", err)
}
for i, t := range nvmetcpTargetInfo {
publishContext[fmt.Sprintf("%s%d", common.PublishContextNVMETCPPortalsPrefix, i)] = t.Portal
publishContext[fmt.Sprintf("%s%d", common.PublishContextNVMETCPTargetsPrefix, i)] = t.Target
}
// If the system is not capable of any protocol, then we will through the error
if len(iscsiTargetsInfo) == 0 && len(fcTargetsInfo) == 0 && len(nvmefcTargetInfo) == 0 && len(nvmetcpTargetInfo) == 0 {
return errors.New("unable to get targets for any protocol")
}
return nil
}
// NfsPublisher implementation of VolumePublisher for NFS volumes
type NfsPublisher struct {
// ExternalAccess used to set custom ip to be added to the NFS Export 'hosts' list
ExternalAccess string
}
// Publish publishes FileSystem by adding host (node) to the NFS Export 'hosts' list
func (n *NfsPublisher) Publish(ctx context.Context, req *csi.ControllerPublishVolumeRequest, client gopowerstore.Client,
kubeNodeID string, volumeID string,
) (*csi.ControllerPublishVolumeResponse, error) {
fs, err := client.GetFS(ctx, volumeID)
if err != nil {
if apiError, ok := err.(gopowerstore.APIError); ok && apiError.NotFound() {
return nil, status.Errorf(codes.NotFound, "volume with ID '%s' not found", volumeID)
}
return nil, status.Errorf(codes.Internal, "failure checking volume status for volume publishing: %s", err.Error())
}
publishContext := make(map[string]string)
ipList := common.GetIPListFromString(kubeNodeID)
if ipList == nil || len(ipList) == 0 {
return nil, errors.New("can't find IP in node ID")
}
ip := ipList[0]
ipWithNat := make([]string, 0, 2)
ipWithNat = append(ipWithNat, ip)
// Create NFS export if it doesn't exist
_, err = client.GetNFSExportByFileSystemID(ctx, fs.ID)
if err != nil {
if apiError, ok := err.(gopowerstore.APIError); ok && apiError.NotFound() {
_, err := client.CreateNFSExport(ctx, &gopowerstore.NFSExportCreate{
Name: fs.Name,
FileSystemID: volumeID,
Path: "/" + fs.Name,
})
if err != nil {
return nil, status.Errorf(codes.Internal, "failure creating nfs export: %s", err.Error())
}
} else {
return nil, status.Errorf(codes.Internal,
"failure checking nfs export status for volume publishing: %s", err.Error())
}
}
export, err := client.GetNFSExportByFileSystemID(ctx, fs.ID)
if err != nil {
return nil, status.Errorf(codes.Internal, "failure getting nfs export: %s", err.Error())
}
if n.ExternalAccess != "" && !common.ExternalAccessAlreadyAdded(export, n.ExternalAccess) {
externalAccess, err := common.GetIPListWithMaskFromString(n.ExternalAccess)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "can't find IP in X_CSI_POWERSTORE_EXTERNAL_ACCESS variable")
}
log.Debug("externalAccess parsed IP:", externalAccess)
ipWithNat = append(ipWithNat, externalAccess)
}
// Add host IP to existing nfs export
_, err = client.ModifyNFSExport(ctx, &gopowerstore.NFSExportModify{
AddRWRootHosts: ipWithNat,
}, export.ID)
if err != nil {
log.Debug("Error while PublishVolume: ", err.Error())
if apiError, ok := err.(gopowerstore.APIError); !(ok && (apiError.NotFound() || apiError.HostAlreadyPresentInNFSExport())) {
return nil, status.Errorf(codes.Internal, "failure when adding new host to nfs export: %s", err.Error())
}
}
nas, err := client.GetNAS(ctx, fs.NasServerID)
if err != nil {
return nil, status.Errorf(codes.Internal, "failure getting nas %s", err.Error())
}
fileInterface, err := client.GetFileInterface(ctx, nas.CurrentPreferredIPv4InterfaceID)
if err != nil {
return nil, status.Errorf(codes.Internal, "failure getting file interface %s", err.Error())
}
publishContext[KeyNasName] = nas.Name // we need to pass that to node part of the driver
publishContext[common.KeyNfsExportPath] = fileInterface.IPAddress + ":/" + export.Name
publishContext[common.KeyHostIP] = ipWithNat[0]
if n.ExternalAccess != "" {
parsedExternalAccess, _ := common.GetIPListWithMaskFromString(n.ExternalAccess)
publishContext[common.KeyNatIP] = parsedExternalAccess
}
publishContext[common.KeyExportID] = export.ID
publishContext[common.KeyAllowRoot] = req.VolumeContext[common.KeyAllowRoot]
publishContext[common.KeyNfsACL] = req.VolumeContext[common.KeyNfsACL]
return &csi.ControllerPublishVolumeResponse{PublishContext: publishContext}, nil
}
// CheckIfVolumeExists queries storage array if FileSystem with given name exists
func (n *NfsPublisher) CheckIfVolumeExists(ctx context.Context, client gopowerstore.Client, volID string) error {
_, err := client.GetFS(ctx, volID)
if err != nil {
if apiError, ok := err.(gopowerstore.APIError); ok && apiError.NotFound() {
return status.Errorf(codes.NotFound, "volume with ID '%s' not found", volID)
}
return status.Errorf(codes.Internal, "failure checking volume status for volume publishing: %s", err.Error())
}
return nil
}
/*
*
* Copyright © 2021-2024 Dell Inc. or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package controller
import (
"context"
"fmt"
"strings"
"github.com/dell/csi-powerstore/v2/pkg/array"
csiext "github.com/dell/dell-csi-extensions/replication"
"github.com/dell/gopowerstore"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// CreateRemoteVolume creates replica of volume in remote cluster
func (s *Service) CreateRemoteVolume(ctx context.Context,
req *csiext.CreateRemoteVolumeRequest,
) (*csiext.CreateRemoteVolumeResponse, error) {
volID := req.GetVolumeHandle()
if volID == "" {
return nil, status.Error(codes.InvalidArgument, "volume ID is required")
}
id, arrayID, protocol, _, _, err := array.ParseVolumeID(ctx, volID, s.DefaultArray(), nil)
if err != nil {
log.Error(err)
return nil, err
}
arr, ok := s.Arrays()[arrayID]
if !ok {
log.Info("ip is nil")
return nil, status.Error(codes.InvalidArgument, "failed to find array with given IP")
}
vgs, err := arr.GetClient().GetVolumeGroupsByVolumeID(ctx, id)
if err != nil {
return nil, err
}
if len(vgs.VolumeGroup) == 0 {
return nil, status.Error(codes.Unimplemented, "replication of volumes that aren't assigned to group is not implemented yet")
}
vg := vgs.VolumeGroup[0]
rs, err := arr.Client.GetReplicationSessionByLocalResourceID(ctx, vg.ID)
if err != nil {
return nil, err
}
var remoteVolumeID string
for _, sp := range rs.StorageElementPairs {
if sp.LocalStorageElementID == id {
remoteVolumeID = sp.RemoteStorageElementID
}
}
if remoteVolumeID == "" {
return nil, status.Errorf(codes.Internal, "couldn't find volume id %s in storage element pairs of replication session", id)
}
vol, err := arr.Client.GetVolume(ctx, id)
if err != nil {
return nil, status.Errorf(codes.Internal, "can't query volume: %s", err.Error())
}
localSystem, err := arr.Client.GetCluster(ctx)
if err != nil {
return nil, err
}
remoteSystem, err := arr.Client.GetRemoteSystem(ctx, rs.RemoteSystemID)
if err != nil {
return nil, err
}
remoteParams := map[string]string{
"remoteSystem": localSystem.Name,
s.replicationContextPrefix + "arrayID": remoteSystem.SerialNumber,
s.replicationContextPrefix + "managementAddress": remoteSystem.ManagementAddress,
}
remoteVolume := getRemoteCSIVolume(remoteVolumeID+"/"+remoteParams[s.replicationContextPrefix+"arrayID"]+"/"+protocol, vol.Size)
remoteVolume.VolumeContext = remoteParams
return &csiext.CreateRemoteVolumeResponse{
RemoteVolume: remoteVolume,
}, nil
}
// CreateStorageProtectionGroup creates storage protection group
func (s *Service) CreateStorageProtectionGroup(ctx context.Context,
req *csiext.CreateStorageProtectionGroupRequest,
) (*csiext.CreateStorageProtectionGroupResponse, error) {
volID := req.GetVolumeHandle()
if volID == "" {
return nil, status.Error(codes.InvalidArgument, "volume ID is required")
}
id, arrayID, protocol, _, _, err := array.ParseVolumeID(ctx, volID, s.DefaultArray(), nil)
if err != nil {
log.Error(err)
return nil, err
}
arr, ok := s.Arrays()[arrayID]
if !ok {
log.Info("id is nil")
return nil, status.Error(codes.InvalidArgument, "failed to find array with given ID")
}
if protocol == "nfs" {
return nil, status.Error(codes.InvalidArgument, "replication is not supported for NFS volumes")
}
vgs, err := arr.GetClient().GetVolumeGroupsByVolumeID(ctx, id)
if err != nil {
return nil, err
}
if len(vgs.VolumeGroup) == 0 {
return nil, status.Error(codes.Unimplemented, "replication of volumes that aren't assigned to group is not implemented yet")
}
vg := vgs.VolumeGroup[0]
rs, err := arr.Client.GetReplicationSessionByLocalResourceID(ctx, vg.ID)
if err != nil {
return nil, err
}
localSystem, err := arr.Client.GetCluster(ctx)
if err != nil {
return nil, err
}
remoteSystem, err := arr.Client.GetRemoteSystem(ctx, rs.RemoteSystemID)
if err != nil {
return nil, err
}
localParams := map[string]string{
s.replicationContextPrefix + "systemName": localSystem.Name,
s.replicationContextPrefix + "managementAddress": localSystem.ManagementAddress,
s.replicationContextPrefix + "remoteSystemName": remoteSystem.Name,
s.replicationContextPrefix + "remoteManagementAddress": remoteSystem.ManagementAddress,
s.replicationContextPrefix + "globalID": arrayID,
s.replicationContextPrefix + "remoteGlobalID": remoteSystem.SerialNumber,
s.replicationContextPrefix + "VolumeGroupName": vg.Name,
}
remoteParams := map[string]string{
s.replicationContextPrefix + "systemName": remoteSystem.Name,
s.replicationContextPrefix + "managementAddress": remoteSystem.ManagementAddress,
s.replicationContextPrefix + "remoteSystemName": localSystem.Name,
s.replicationContextPrefix + "remoteManagementAddress": localSystem.ManagementAddress,
s.replicationContextPrefix + "globalID": remoteSystem.SerialNumber,
s.replicationContextPrefix + "VolumeGroupName": vg.Name,
}
return &csiext.CreateStorageProtectionGroupResponse{
LocalProtectionGroupId: rs.LocalResourceID,
RemoteProtectionGroupId: rs.RemoteResourceID,
LocalProtectionGroupAttributes: localParams,
RemoteProtectionGroupAttributes: remoteParams,
}, nil
}
// EnsureProtectionPolicyExists ensures protection policy exists
func EnsureProtectionPolicyExists(ctx context.Context, arr *array.PowerStoreArray,
vgName string, remoteSystemName string, rpoEnum gopowerstore.RPOEnum,
) (string, error) {
// Get id of specified remote system
rs, err := arr.Client.GetRemoteSystemByName(ctx, remoteSystemName)
if err != nil {
return "", status.Errorf(codes.Internal, "can't query remote system by name: %s", err.Error())
}
ppName := "pp-" + vgName
// Check that protection policy already exists
pp, err := arr.Client.GetProtectionPolicyByName(ctx, ppName)
if err == nil {
return pp.ID, nil
}
// ensure that replicationRule exists
rrID, err := EnsureReplicationRuleExists(ctx, arr, vgName, rs.ID, rpoEnum)
if err != nil {
return "", status.Errorf(codes.Internal, "can't ensure that replication rule exists")
}
newPp, err := arr.Client.CreateProtectionPolicy(ctx, &gopowerstore.ProtectionPolicyCreate{
Name: ppName,
ReplicationRuleIDs: []string{rrID},
})
if err != nil {
return "", status.Errorf(codes.Internal, "can't create protection policy: %s", err.Error())
}
return newPp.ID, nil
}
// EnsureReplicationRuleExists ensures replication rule exists
func EnsureReplicationRuleExists(ctx context.Context, arr *array.PowerStoreArray,
vgName string, remoteSystemID string, rpoEnum gopowerstore.RPOEnum,
) (string, error) {
rrName := "rr-" + vgName
rr, err := arr.Client.GetReplicationRuleByName(ctx, rrName)
if err != nil {
// Create new rule
newRr, err := arr.Client.CreateReplicationRule(ctx, &gopowerstore.ReplicationRuleCreate{
Name: rrName,
Rpo: rpoEnum,
RemoteSystemID: remoteSystemID,
})
if err != nil {
return "", status.Errorf(codes.Internal, "can't create replication rule: %s", err.Error())
}
return newRr.ID, nil
}
return rr.ID, nil
}
// GetReplicationCapabilities is a getter for replication capabilities
func (s *Service) GetReplicationCapabilities(_ context.Context, _ *csiext.GetReplicationCapabilityRequest) (*csiext.GetReplicationCapabilityResponse, error) {
rep := new(csiext.GetReplicationCapabilityResponse)
rep.Capabilities = []*csiext.ReplicationCapability{
{
Type: &csiext.ReplicationCapability_Rpc{
Rpc: &csiext.ReplicationCapability_RPC{
Type: csiext.ReplicationCapability_RPC_CREATE_REMOTE_VOLUME,
},
},
},
{
Type: &csiext.ReplicationCapability_Rpc{
Rpc: &csiext.ReplicationCapability_RPC{
Type: csiext.ReplicationCapability_RPC_CREATE_PROTECTION_GROUP,
},
},
},
{
Type: &csiext.ReplicationCapability_Rpc{
Rpc: &csiext.ReplicationCapability_RPC{
Type: csiext.ReplicationCapability_RPC_DELETE_PROTECTION_GROUP,
},
},
},
{
Type: &csiext.ReplicationCapability_Rpc{
Rpc: &csiext.ReplicationCapability_RPC{
Type: csiext.ReplicationCapability_RPC_REPLICATION_ACTION_EXECUTION,
},
},
},
{
Type: &csiext.ReplicationCapability_Rpc{
Rpc: &csiext.ReplicationCapability_RPC{
Type: csiext.ReplicationCapability_RPC_MONITOR_PROTECTION_GROUP,
},
},
},
}
rep.Actions = []*csiext.SupportedActions{
{
Actions: &csiext.SupportedActions_Type{
Type: csiext.ActionTypes_FAILOVER_REMOTE,
},
},
{
Actions: &csiext.SupportedActions_Type{
Type: csiext.ActionTypes_UNPLANNED_FAILOVER_LOCAL,
},
},
{
Actions: &csiext.SupportedActions_Type{
Type: csiext.ActionTypes_REPROTECT_LOCAL,
},
},
{
Actions: &csiext.SupportedActions_Type{
Type: csiext.ActionTypes_SUSPEND,
},
},
{
Actions: &csiext.SupportedActions_Type{
Type: csiext.ActionTypes_RESUME,
},
},
{
Actions: &csiext.SupportedActions_Type{
Type: csiext.ActionTypes_SYNC,
},
},
}
return rep, nil
}
// ExecuteAction is a method to execute an action request
func (s *Service) ExecuteAction(ctx context.Context,
req *csiext.ExecuteActionRequest,
) (*csiext.ExecuteActionResponse, error) {
var reqID string
localParams := req.GetProtectionGroupAttributes()
protectionGroupID := req.GetProtectionGroupId()
action := req.GetAction().GetActionTypes().String()
globalID, ok := localParams[s.replicationContextPrefix+"globalID"]
if !ok {
return nil, status.Error(codes.InvalidArgument, "missing globalID in protection group attributes")
}
arr, ok := s.Arrays()[globalID]
if !ok {
return nil, status.Errorf(codes.InvalidArgument, "can't find array with global id %s", globalID)
}
pstoreClient := arr.GetClient()
// log all parameters used in ExecuteAction call
fields := map[string]interface{}{
"RequestID": reqID,
"GlobalID": localParams[s.replicationContextPrefix+"globalID"],
"ProtectedStorageGroup": protectionGroupID,
"Action": action,
}
log.WithFields(fields).Info("Executing ExecuteAction with following fields")
rs, err := pstoreClient.GetReplicationSessionByLocalResourceID(ctx, protectionGroupID)
if err != nil {
return nil, err
}
client := pstoreClient
var execAction gopowerstore.ActionType
var params *gopowerstore.FailoverParams
switch action {
case csiext.ActionTypes_FAILOVER_REMOTE.String():
execAction = gopowerstore.RsActionFailover
params = &gopowerstore.FailoverParams{IsPlanned: true, Reverse: false}
case csiext.ActionTypes_UNPLANNED_FAILOVER_LOCAL.String():
execAction = gopowerstore.RsActionFailover
params = &gopowerstore.FailoverParams{IsPlanned: false, Reverse: false}
case csiext.ActionTypes_SUSPEND.String():
execAction = gopowerstore.RsActionPause
case csiext.ActionTypes_RESUME.String():
execAction = gopowerstore.RsActionResume
case csiext.ActionTypes_SYNC.String():
execAction = gopowerstore.RsActionSync
case csiext.ActionTypes_REPROTECT_LOCAL.String():
execAction = gopowerstore.RsActionReprotect
default:
return nil, status.Errorf(codes.Unknown, "The requested action does not match with supported actions")
}
resErr := ExecuteAction(&rs, client, execAction, params)
if resErr != nil {
return nil, resErr
}
statusResp, err := s.GetStorageProtectionGroupStatus(ctx, &csiext.GetStorageProtectionGroupStatusRequest{
ProtectionGroupId: protectionGroupID,
ProtectionGroupAttributes: localParams,
})
if err != nil {
return nil, status.Errorf(codes.Internal, "can't get storage protection group status: %s", err.Error())
}
resp := &csiext.ExecuteActionResponse{
Success: true,
ActionTypes: &csiext.ExecuteActionResponse_Action{
Action: req.GetAction(),
},
Status: statusResp.Status,
}
return resp, nil
}
// ExecuteAction validates current state of replication & executes provided action on RS
func ExecuteAction(session *gopowerstore.ReplicationSession, pstoreClient gopowerstore.Client, action gopowerstore.ActionType, failoverParams *gopowerstore.FailoverParams) error {
inDesiredState, actionRequired, err := validateRSState(session, action)
if err != nil {
return err
}
if !inDesiredState {
if !actionRequired {
return status.Errorf(codes.Aborted, "Execute action: RS (%s) is still executing previous action", session.ID)
}
_, err := pstoreClient.ExecuteActionOnReplicationSession(context.Background(), session.ID, action,
failoverParams)
if err != nil {
if apiError, ok := err.(gopowerstore.APIError); ok && apiError.UnableToFailoverFromDestination() {
log.Error(fmt.Sprintf("Fail over: Failed to modify RS (%s) - Error (%s)", session.ID, err.Error()))
return status.Errorf(codes.Internal, "Execute action: Failed to modify RS (%s) - Error (%s)", session.ID, err.Error())
}
}
log.Debugf("Action (%s) successful on RS(%s)", string(action), session.ID)
}
return nil
}
// validateRSState checks if the given action is permissible on the protected storage group based on its current state
func validateRSState(session *gopowerstore.ReplicationSession, action gopowerstore.ActionType) (inDesiredState bool, actionRequired bool, resErr error) {
state := session.State
log.Infof("replication session is in %s", state)
switch action {
case gopowerstore.RsActionResume:
if state == "OK" {
log.Infof("RS (%s) is already in desired state: (%s)", session.ID, state)
return true, false, nil
}
case gopowerstore.RsActionReprotect:
if state == "OK" {
log.Infof("RS (%s) is already in desired state: (%s)", session.ID, state)
return true, false, nil
}
case gopowerstore.RsActionPause:
if state == "Paused" || state == "Paused_For_Migration" || state == "Paused_For_NDU" {
log.Infof("RS (%s) is already in desired state: (%s)", session.ID, state)
return true, false, nil
}
case gopowerstore.RsActionFailover:
if state == "Failing_Over" {
return false, false, nil
}
if state == "Failed_Over" {
log.Infof("RS (%s) is already in desired state: (%s)", session.ID, state)
return true, false, nil
}
}
return false, true, nil
}
// DeleteStorageProtectionGroup deletes storage protection group
func (s *Service) DeleteStorageProtectionGroup(ctx context.Context,
req *csiext.DeleteStorageProtectionGroupRequest,
) (*csiext.DeleteStorageProtectionGroupResponse, error) {
localParams := req.GetProtectionGroupAttributes()
groupID := req.GetProtectionGroupId()
globalID, ok := localParams[s.replicationContextPrefix+"globalID"]
if !ok {
return nil, status.Error(codes.InvalidArgument, "missing globalID in protection group attributes")
}
arr, ok := s.Arrays()[globalID]
if !ok {
return nil, status.Errorf(codes.InvalidArgument, "can't find array with global id %s", globalID)
}
fields := map[string]interface{}{
"GlobalID": globalID,
"ProtectedStorageGroup": groupID,
}
log.WithFields(fields).Info("Deleting storage protection group")
vg, err := arr.GetClient().GetVolumeGroup(ctx, groupID)
if apiErr, ok := err.(gopowerstore.APIError); ok && !apiErr.NotFound() {
return nil, status.Errorf(codes.Internal, "Error: Unable to get Volume Group")
}
if vg.ID != "" {
if vg.ProtectionPolicyID != "" {
_, err := arr.GetClient().ModifyVolumeGroup(ctx, &gopowerstore.VolumeGroupModify{
ProtectionPolicyID: "",
}, groupID)
if apiErr, ok := err.(gopowerstore.APIError); ok && !apiErr.NotFound() {
return nil, status.Errorf(codes.Internal, "Error: Unable to un-assign PP from Volume Group")
}
}
_, err = arr.Client.DeleteVolumeGroup(ctx, groupID)
if apiError, ok := err.(gopowerstore.APIError); ok && !apiError.NotFound() {
return nil, status.Errorf(codes.Internal, "Error: %s: Unable to delete Volume Group", apiError.Error())
}
}
log.WithFields(fields).Info("Deleting protection policy")
vgName, ok := localParams[s.replicationContextPrefix+"VolumeGroupName"]
if !ok {
return nil, status.Errorf(codes.Internal, "Error: Unable to get volume group name")
}
pp, err := arr.GetClient().GetProtectionPolicyByName(ctx, "pp-"+vgName)
if apiErr, ok := err.(gopowerstore.APIError); ok && !apiErr.NotFound() {
return nil, status.Errorf(codes.Internal, "Error: Unable to get the PP")
}
if pp.ID != "" && len(pp.Volumes) == 0 && len(pp.VolumeGroups) == 0 {
_, err := arr.Client.DeleteProtectionPolicy(ctx, pp.ID)
if apiErr, ok := err.(gopowerstore.APIError); ok && !apiErr.NotFound() {
return nil, status.Errorf(codes.Internal, "Error: Unable to delete PP")
}
}
log.WithFields(fields).Info("Deleting replication rule")
rr, err := arr.GetClient().GetReplicationRuleByName(ctx, "rr-"+vgName)
if apiErr, ok := err.(gopowerstore.APIError); ok && !apiErr.NotFound() {
return nil, status.Errorf(codes.Internal, "Error: RR not found")
}
if rr.ID != "" && len(rr.ProtectionPolicies) == 0 {
_, err = arr.GetClient().DeleteReplicationRule(ctx, rr.ID)
if apiErr, ok := err.(gopowerstore.APIError); ok && !apiErr.NotFound() {
return nil, status.Errorf(codes.Internal, "Error: Unable to delete replication rule")
}
}
return &csiext.DeleteStorageProtectionGroupResponse{}, nil
}
// DeleteLocalVolume deletes a volume on the local storage array upon request from a remote replication controller.
func (s *Service) DeleteLocalVolume(ctx context.Context,
req *csiext.DeleteLocalVolumeRequest,
) (*csiext.DeleteLocalVolumeResponse, error) {
log.Info("Deleting local volume " + req.VolumeHandle + " per request from remote replication controller")
// req.VolumeHandle is of format <volumeid>/<array ID>/<protocol>. We only need the IDs.
splitHandle := strings.Split(req.VolumeHandle, `/`)
if len(splitHandle) != 3 {
return nil, status.Errorf(codes.InvalidArgument, "can't delete volume of improper handle format")
}
volumeID := splitHandle[0]
globalID := splitHandle[1]
arr, ok := s.Arrays()[globalID]
if !ok {
return nil, status.Errorf(codes.InvalidArgument, "can't find array with global ID %s", globalID)
}
vol, err := arr.GetClient().GetVolume(ctx, volumeID)
if err != nil {
if apiError, ok := err.(gopowerstore.APIError); ok {
if apiError.NotFound() {
// volume doesn't exist, return success
log.Info("Volume does not exist. It may have already been deleted.")
return &csiext.DeleteLocalVolumeResponse{}, nil
}
}
// any other error means the volume to be deleted couldn't be retrieved, return error
return nil, status.Errorf(codes.Internal, "Error: Unable to get volume for deletion")
}
vgs, err := arr.GetClient().GetVolumeGroupsByVolumeID(ctx, volumeID)
if err != nil {
if apiError, ok := err.(gopowerstore.APIError); !ok || !apiError.NotFound() {
return nil, err
}
}
// Do not proceed to DeleteVolume if there is a volume group or protection policy.
// DeleteVolume would remove those, and source-side deletion is the responsible party for that operation.
if len(vgs.VolumeGroup) != 0 {
log.Info("Cannot delete local volume " + volumeID + ", volume is part of a Volume Group and needs to be removed first.")
return nil, status.Errorf(codes.Internal, "Error: Unable to delete volume")
} else if vol.ProtectionPolicyID != "" {
log.Info("Cannot delete local volume " + volumeID + ", volume is under a protection policy that must be removed first.")
return nil, status.Errorf(codes.Internal, "Error: Unable to delete volume")
}
_, err = arr.GetClient().DeleteVolume(ctx, nil, volumeID)
if err != nil {
if apiErr, ok := err.(gopowerstore.APIError); !ok || !apiErr.NotFound() {
log.Info("Cannot delete local volume " + volumeID + ", deletion returned a non-404 error code.")
return nil, status.Errorf(codes.Internal, "Error: Unable to delete volume")
}
}
log.Info("Local volume deleted successfully.")
return &csiext.DeleteLocalVolumeResponse{}, nil
}
// GetStorageProtectionGroupStatus gets storage protection group status
func (s *Service) GetStorageProtectionGroupStatus(ctx context.Context,
req *csiext.GetStorageProtectionGroupStatusRequest,
) (*csiext.GetStorageProtectionGroupStatusResponse, error) {
localParams := req.GetProtectionGroupAttributes()
groupID := req.GetProtectionGroupId()
globalID, ok := localParams[s.replicationContextPrefix+"globalID"]
if !ok {
return nil, status.Error(codes.InvalidArgument, "missing globalID in protection group attributes")
}
arr, ok := s.Arrays()[globalID]
if !ok {
return nil, status.Errorf(codes.InvalidArgument, "can't find array with global id %s", globalID)
}
fields := map[string]interface{}{
"GlobalID": globalID,
"ProtectedStorageGroup": groupID,
}
log.WithFields(fields).Info("Checking replication session status")
rs, err := arr.GetClient().GetReplicationSessionByLocalResourceID(ctx, groupID)
if err != nil {
return nil, err
}
var state csiext.StorageProtectionGroupStatus_State
switch rs.State {
case gopowerstore.RsStateOk:
state = csiext.StorageProtectionGroupStatus_SYNCHRONIZED
break
case gopowerstore.RsStateFailedOver:
state = csiext.StorageProtectionGroupStatus_FAILEDOVER
break
case gopowerstore.RsStatePaused, gopowerstore.RsStatePausedForMigration, gopowerstore.RsStatePausedForNdu, gopowerstore.RsStateSystemPaused:
state = csiext.StorageProtectionGroupStatus_SUSPENDED
break
case gopowerstore.RsStateFailingOver, gopowerstore.RsStateFailingOverForDR, gopowerstore.RsStateResuming,
gopowerstore.RsStateReprotecting, gopowerstore.RsStatePartialCutoverForMigration, gopowerstore.RsStateSynchronizing,
gopowerstore.RsStateInitializing:
state = csiext.StorageProtectionGroupStatus_SYNC_IN_PROGRESS
break
case gopowerstore.RsStateError:
state = csiext.StorageProtectionGroupStatus_INVALID
break
default:
log.Infof("The status (%s) does not match with known protection group states", rs.State)
state = csiext.StorageProtectionGroupStatus_UNKNOWN
break
}
log.Infof("The current state for replication session (%s) for group (%s) is (%s).", rs.ID, groupID, state.String())
resp := &csiext.GetStorageProtectionGroupStatusResponse{
Status: &csiext.StorageProtectionGroupStatus{
State: state,
IsSource: rs.Role != "Destination",
},
}
return resp, err
}
// WithRP appends Replication Prefix to provided string
func (s *Service) WithRP(key string) string {
replicationPrefix := s.replicationPrefix
if replicationPrefix == "" {
replicationPrefix = ReplicationPrefix
}
return replicationPrefix + "/" + key
}
func getRemoteCSIVolume(volumeID string, size int64) *csiext.Volume {
volume := &csiext.Volume{
CapacityBytes: size,
VolumeId: volumeID,
VolumeContext: nil, // TODO: add values to volume context if needed
}
return volume
}
/*
*
* Copyright © 2021 Dell Inc. or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package controller
import (
"context"
"github.com/dell/gopowerstore"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
const (
// FilesystemSnapshotType represents filesystem snapshot type
FilesystemSnapshotType SnapshotType = "filesystem"
// BlockSnapshotType represents block snapshot type
BlockSnapshotType SnapshotType = "block"
)
// SnapshotType represents type of snapshot
type SnapshotType string
// VolumeSnapshot represents snapshot of the block Volume
type VolumeSnapshot gopowerstore.Volume
// FilesystemSnapshot represents snapshot of the FileSystem
type FilesystemSnapshot gopowerstore.FileSystem
// GeneralSnapshot is an interface for combining both Volume and FileSystem
type GeneralSnapshot interface {
// GetID returns ID of the snapshot
GetID() string
// GetSourceID returns ID of the volume/fs that snapshot was created from
GetSourceID() string
// GetSize returns current size of the snapshot
GetSize() int64
// GetType returns type of general snapshot (either filesystem or block)
GetType() SnapshotType
}
// GetID returns ID of the snapshot
func (v VolumeSnapshot) GetID() string {
return v.ID
}
// GetSourceID returns ID of the volume/fs that snapshot was created from
func (v VolumeSnapshot) GetSourceID() string {
return v.ProtectionData.SourceID
}
// GetSize returns current size of the snapshot
func (v VolumeSnapshot) GetSize() int64 {
return v.Size
}
// GetType returns type of general snapshot (either filesystem or block)
func (v VolumeSnapshot) GetType() SnapshotType {
return BlockSnapshotType
}
// GetID returns ID of the snapshot
func (f FilesystemSnapshot) GetID() string {
return f.ID
}
// GetSourceID returns ID of the volume/fs that snapshot was created from
func (f FilesystemSnapshot) GetSourceID() string {
return f.ParentID
}
// GetSize returns current size of the snapshot
func (f FilesystemSnapshot) GetSize() int64 {
return f.SizeTotal
}
// GetType returns type of general snapshot (either filesystem or block)
func (f FilesystemSnapshot) GetType() SnapshotType {
return FilesystemSnapshotType
}
// VolumeSnapshotter allow to create snapshot of the volume/fs
type VolumeSnapshotter interface {
// GetExistingSnapshot queries storage array if given snapshot already exists
GetExistingSnapshot(context.Context, string, gopowerstore.Client) (GeneralSnapshot, error)
// Create creates new snapshot of a given volume
Create(context.Context, string, string, gopowerstore.Client) (gopowerstore.CreateResponse, error)
}
// SCSISnapshotter is a implementation of VolumeSnapshotter for SCSI based (FC, iSCSI) volumes
type SCSISnapshotter struct{}
// GetExistingSnapshot queries storage array if given snapshot of the Volume already exists
func (*SCSISnapshotter) GetExistingSnapshot(ctx context.Context, snapName string, client gopowerstore.Client) (GeneralSnapshot, error) {
snap, err := client.GetVolumeByName(ctx, snapName)
if err != nil {
return VolumeSnapshot{}, status.Errorf(codes.Internal,
"can't find volume snapshot '%s'", snapName)
}
return VolumeSnapshot(snap), nil
}
// Create creates new snapshot of a given Volume
func (*SCSISnapshotter) Create(ctx context.Context, snapName string, sourceID string, client gopowerstore.Client) (gopowerstore.CreateResponse, error) {
reqParams := &gopowerstore.SnapshotCreate{
Name: &snapName,
Description: nil,
}
return client.CreateSnapshot(ctx, reqParams, sourceID)
}
// NfsSnapshotter is a implementation of VolumeSnapshotter for NFS volumes
type NfsSnapshotter struct{}
// GetExistingSnapshot queries storage array if given snapshot of the FileSystem already exists
func (*NfsSnapshotter) GetExistingSnapshot(ctx context.Context, snapName string, client gopowerstore.Client) (GeneralSnapshot, error) {
snap, err := client.GetFSByName(ctx, snapName)
if err != nil {
return FilesystemSnapshot{}, status.Errorf(codes.Internal,
"can't find filesystem snapshot '%s': %s", snapName, err.Error())
}
return FilesystemSnapshot(snap), nil
}
// Create creates new snapshot of a given FileSystem
func (*NfsSnapshotter) Create(ctx context.Context, snapName string, sourceID string, client gopowerstore.Client) (gopowerstore.CreateResponse, error) {
reqParams := &gopowerstore.SnapshotFSCreate{
Name: snapName,
Description: "",
}
return client.CreateFsSnapshot(ctx, reqParams, sourceID)
}
/*
*
* Copyright © 2021 Dell Inc. or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package identity provides CSI specification compatible identity service.
package identity
import (
"context"
"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/protobuf/types/known/wrapperspb"
)
// NewIdentityService creates new identity service
func NewIdentityService(name string, version string, manifest map[string]string) *Service {
return &Service{
name: name,
version: version,
ready: true,
manifest: manifest,
}
}
// Service is a identity service allows driver to return capabilities, health, and other metadata
type Service struct {
name string
version string
manifest map[string]string
ready bool
}
// GetPluginInfo returns general information about plugin (driver) such as name, version and manifest
func (s Service) GetPluginInfo(_ context.Context, _ *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) {
return &csi.GetPluginInfoResponse{
Name: s.name,
VendorVersion: s.version,
Manifest: s.manifest,
}, nil
}
// GetPluginCapabilities returns capabilities that are supported by the driver
func (s Service) GetPluginCapabilities(_ context.Context, _ *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
var rep csi.GetPluginCapabilitiesResponse
rep.Capabilities = []*csi.PluginCapability{
{
Type: &csi.PluginCapability_Service_{
Service: &csi.PluginCapability_Service{
Type: csi.PluginCapability_Service_CONTROLLER_SERVICE,
},
},
},
{
Type: &csi.PluginCapability_VolumeExpansion_{
VolumeExpansion: &csi.PluginCapability_VolumeExpansion{
Type: csi.PluginCapability_VolumeExpansion_ONLINE,
},
},
},
{
Type: &csi.PluginCapability_VolumeExpansion_{
VolumeExpansion: &csi.PluginCapability_VolumeExpansion{
Type: csi.PluginCapability_VolumeExpansion_OFFLINE,
},
},
},
{
Type: &csi.PluginCapability_Service_{
Service: &csi.PluginCapability_Service{
Type: csi.PluginCapability_Service_VOLUME_ACCESSIBILITY_CONSTRAINTS,
},
},
},
}
return &rep, nil
}
// Probe returns current state of the driver and if it is ready to receive requests
func (s Service) Probe(_ context.Context, _ *csi.ProbeRequest) (*csi.ProbeResponse, error) {
return &csi.ProbeResponse{Ready: &wrapperspb.BoolValue{Value: s.ready}}, nil
}
/*
*
* Copyright © 2021-2023 Dell Inc. or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package interceptors contains custom unary gRPC interceptors.
package interceptors
import (
"context"
"fmt"
"io"
"sync"
"time"
"github.com/akutz/gosync"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/dell/csi-powerstore/v2/pkg/common"
controller "github.com/dell/csi-powerstore/v2/pkg/controller"
"github.com/dell/gocsi/middleware/serialvolume"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
csictx "github.com/dell/gocsi/context"
mwtypes "github.com/dell/gocsi/middleware/serialvolume/types"
log "github.com/sirupsen/logrus"
xctx "golang.org/x/net/context"
"github.com/dell/csi-metadata-retriever/retriever"
"github.com/kubernetes-csi/csi-lib-utils/connection"
"github.com/kubernetes-csi/csi-lib-utils/metrics"
)
type rewriteRequestIDInterceptor struct{}
func (r *rewriteRequestIDInterceptor) handleServer(ctx context.Context, req interface{},
_ *grpc.UnaryServerInfo, handler grpc.UnaryHandler,
) (interface{}, error) {
// Retrieve the gRPC metadata from the incoming context.
md, mdOK := metadata.FromIncomingContext(ctx)
// Check the metadata from the request ID.
if mdOK {
ID, IDOK := md[csictx.RequestIDKey]
if IDOK {
newIDValue := fmt.Sprintf("%s-%s", csictx.RequestIDKey, ID[0])
ctx = context.WithValue(ctx, interface{}(csictx.RequestIDKey), newIDValue)
}
}
return handler(ctx, req)
}
// NewRewriteRequestIDInterceptor creates new unary interceptor that rewrites request IDs
func NewRewriteRequestIDInterceptor() grpc.UnaryServerInterceptor {
interceptor := &rewriteRequestIDInterceptor{}
return interceptor.handleServer
}
type lockProvider struct {
volIDLocksL sync.Mutex
volNameLocksL sync.Mutex
volIDLocks map[string]gosync.TryLocker
volNameLocks map[string]gosync.TryLocker
}
func (i *lockProvider) GetLockWithID(_ context.Context, id string) (gosync.TryLocker, error) {
i.volIDLocksL.Lock()
defer i.volIDLocksL.Unlock()
lock := i.volIDLocks[id]
if lock == nil {
lock = &gosync.TryMutex{}
i.volIDLocks[id] = lock
}
return lock, nil
}
func (i *lockProvider) GetLockWithName(_ context.Context, name string) (gosync.TryLocker, error) {
i.volNameLocksL.Lock()
defer i.volNameLocksL.Unlock()
lock := i.volNameLocks[name]
if lock == nil {
lock = &gosync.TryMutex{}
i.volNameLocks[name] = lock
}
return lock, nil
}
type opts struct {
timeout time.Duration
locker mwtypes.VolumeLockerProvider
MetadataSidecarClient retriever.MetadataRetrieverClient
}
type interceptor struct {
opts opts
}
// NewCustomSerialLock creates new unary interceptor that locks gRPC requests
func NewCustomSerialLock(mode string) grpc.UnaryServerInterceptor {
locker := &lockProvider{
volIDLocks: map[string]gosync.TryLocker{},
volNameLocks: map[string]gosync.TryLocker{},
}
gocsiSerializer := serialvolume.New(serialvolume.WithLockProvider(locker))
i := &interceptor{opts{locker: locker, timeout: 0}}
// To avoid unnecessary call, we can add a conditional check to prevent this logic from being called unnecessarily in node pods.
if mode == "controller" {
i.createMetadataRetrieverClient(context.Background())
}
handle := func(ctx xctx.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
switch t := req.(type) {
case *csi.CreateVolumeRequest:
return i.createVolume(ctx, t, info, handler)
case *csi.NodeStageVolumeRequest:
return i.nodeStageVolume(ctx, t, info, handler)
case *csi.NodeUnstageVolumeRequest:
return i.nodeUnstageVolume(ctx, t, info, handler)
default:
return gocsiSerializer(ctx, req, info, handler)
}
}
return handle
}
func (i *interceptor) createMetadataRetrieverClient(ctx context.Context) {
metricsManager := metrics.NewCSIMetricsManagerWithOptions("csi-metadata-retriever",
metrics.WithProcessStartTime(false),
metrics.WithSubsystem(metrics.SubsystemSidecar))
if retrieverAddress, ok := csictx.LookupEnv(ctx, common.EnvMetadataRetrieverEndpoint); ok {
rpcConn, err := connection.Connect(retrieverAddress, metricsManager, connection.OnConnectionLoss(connection.ExitOnConnectionLoss()))
if err != nil {
log.Error(err.Error())
}
retrieverClient := retriever.NewMetadataRetrieverClient(rpcConn, 100*time.Second)
if retrieverClient == nil {
log.Error("Cannot get csi-metadata-retriever client")
}
i.opts.MetadataSidecarClient = retrieverClient
} else {
log.Warn("env var not found: ", common.EnvMetadataRetrieverEndpoint)
}
}
const pending = "pending"
func (i *interceptor) nodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest,
_ *grpc.UnaryServerInfo, handler grpc.UnaryHandler,
) (res interface{}, resErr error) {
lock, err := i.opts.locker.GetLockWithID(ctx, req.VolumeId)
if err != nil {
return nil, err
}
if closer, ok := lock.(io.Closer); ok {
defer closer.Close() // #nosec G307
}
if !lock.TryLock(i.opts.timeout) {
return nil, status.Error(codes.Aborted, pending)
}
defer lock.Unlock()
return handler(ctx, req)
}
func (i *interceptor) nodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest,
_ *grpc.UnaryServerInfo, handler grpc.UnaryHandler,
) (res interface{}, resErr error) {
lock, err := i.opts.locker.GetLockWithID(ctx, req.VolumeId)
if err != nil {
return nil, err
}
if closer, ok := lock.(io.Closer); ok {
defer closer.Close() // #nosec G307
}
if !lock.TryLock(i.opts.timeout) {
return nil, status.Error(codes.Aborted, pending)
}
defer lock.Unlock()
return handler(ctx, req)
}
func (i *interceptor) createVolume(ctx context.Context, req *csi.CreateVolumeRequest,
_ *grpc.UnaryServerInfo, handler grpc.UnaryHandler,
) (res interface{}, resErr error) {
lock, err := i.opts.locker.GetLockWithID(ctx, req.Name)
if err != nil {
return nil, err
}
if closer, ok := lock.(io.Closer); ok {
defer closer.Close() // #nosec G307
}
if !lock.TryLock(i.opts.timeout) {
return nil, status.Error(codes.Aborted, pending)
}
defer lock.Unlock()
metadataReq := &retriever.GetPVCLabelsRequest{
Name: req.Parameters[controller.KeyCSIPVCName],
NameSpace: req.Parameters[controller.KeyCSIPVCNamespace],
}
if i.opts.MetadataSidecarClient != nil {
metadataRes, err := i.opts.MetadataSidecarClient.GetPVCLabels(ctx, metadataReq)
if err != nil {
log.Errorf("Cannot retrieve labels for PVC %s in namespace: %s, error: %v",
controller.KeyCSIPVCName,
controller.KeyCSIPVCNamespace,
err.Error())
}
if metadataRes != nil {
for k, v := range metadataRes.Parameters {
req.Parameters[k] = v
}
} else {
log.Warnf("Metadata retrieved is nil for PVC %s in namespace: %s",
controller.KeyCSIPVCName,
controller.KeyCSIPVCNamespace)
}
}
return handler(ctx, req)
}
/*
*
* Copyright © 2022 Dell Inc. or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package node
import (
"context"
"fmt"
"os/exec"
"regexp"
"strings"
"github.com/dell/gopowerstore"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// NFSv4ACLsInterface contains method definition to set NFSv4 ACLs
type NFSv4ACLsInterface interface {
// SetNfsv4Acls sets NFSv4 ACLs
SetNfsv4Acls(acls string, dir string) error
}
// NFSv4ACLs implements setting NFSv4 ACLs
type NFSv4ACLs struct{}
func validateAndSetACLs(ctx context.Context, s NFSv4ACLsInterface, nasName string, client gopowerstore.Client, acls string, dir string) (bool, error) {
aclsConfigured := false
if nfsv4ACLs(acls) {
if isNfsv4Enabled(ctx, client, nasName) {
if err := s.SetNfsv4Acls(acls, dir); err != nil {
log.Error(fmt.Sprintf("can't assign NFSv4 ACLs to folder %s: %s", dir, err.Error()))
return false, err
}
aclsConfigured = true
} else {
return false, status.Errorf(codes.Internal, "can't assign NFSv4 ACLs to folder %s: NAS server is not NFSv4 enabled", dir)
}
} else {
return false, status.Errorf(codes.Internal, "can't assign ACLs to folder %s: invalid NFSv4 ACL format %s", dir, acls)
}
return aclsConfigured, nil
}
func posixMode(acls string) bool {
if matched, _ := regexp.Match(`\d{3,4}`, []byte(acls)); matched {
return true
}
return false
}
func nfsv4ACLs(acls string) bool {
aclsList := strings.Split(acls, ",")
for _, acl := range aclsList {
matched, err := regexp.Match(`([ADUL]:\w*:[\w.]*[@]*[\w.]*:\w*)`, []byte(acl))
if !matched || err != nil {
return false
}
}
return true
}
// SetNfsv4Acls sets NFSv4 ACLS
func (n *NFSv4ACLs) SetNfsv4Acls(acls string, dir string) error {
command := []string{"nfs4_setfacl", "-s", acls, dir}
log.Infof("NFSv4 ACL command: %s \n", strings.Join(command, " "))
// arguments for exec.Command() are validated in caller
cmd := exec.Command(command[0], command[1:]...) // #nosec G204
outStr, err := cmd.Output()
log.Infof("NFSv4 ACL output: %s \n", string(outStr))
return err
}
func isNfsv4Enabled(ctx context.Context, client gopowerstore.Client, nasName string) bool {
nfsv4Enabled := false
nas, err := gopowerstore.Client.GetNASByName(client, ctx, nasName)
if err == nil {
nfsServer, err := gopowerstore.Client.GetNfsServer(client, ctx, nas.NfsServers[0].ID)
if err == nil {
if nfsServer.IsNFSv4Enabled {
nfsv4Enabled = true
} else {
log.Error(fmt.Sprintf("NFS v4 not enabled on NAS server: %s\n", nasName))
}
} else {
log.Error(fmt.Sprintf("can't fetch nfs server with id %s: %s", nas.NfsServers[0].ID, err.Error()))
}
} else {
log.Error(fmt.Sprintf("can't determine nfsv4 enabled: %s", err.Error()))
}
return nfsv4Enabled
}
/*
*
* Copyright © 2021-2023 Dell Inc. or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package node
import (
"bytes"
"context"
"errors"
"fmt"
"net"
"path"
"path/filepath"
"strconv"
"strings"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/dell/csi-powerstore/v2/pkg/common"
"github.com/dell/csi-powerstore/v2/pkg/common/fs"
"github.com/dell/gobrick"
csictx "github.com/dell/gocsi/context"
"github.com/dell/gofsutil"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
const (
powerStoreMaxNodeNameLength = 64
blockVolumePathMarker = "/csi/volumeDevices/publish/"
sysBlock = "/sys/block/"
defaultNodeNamePrefix = "csi-node"
defaultNodeChrootPath = "/noderoot"
// default opts values
defaultTmpDir = "tmp"
ephemeralStagingMountPath = "/var/lib/kubelet/plugins/kubernetes.io/csi/pv/ephemeral/"
commonNfsVolumeFolder = "common_folder"
)
// ISCSIConnector is wrapper of gobrcik.ISCSIConnector interface.
// It allows to connect iSCSI volumes to the node.
type ISCSIConnector interface {
ConnectVolume(ctx context.Context, info gobrick.ISCSIVolumeInfo) (gobrick.Device, error)
DisconnectVolumeByDeviceName(ctx context.Context, name string) error
GetInitiatorName(ctx context.Context) ([]string, error)
}
// NVMEConnector is wrapper of gobrick.NVMEConnector interface.
// It allows to connect NVMe volumes to the node.
type NVMEConnector interface {
ConnectVolume(ctx context.Context, info gobrick.NVMeVolumeInfo, useFC bool) (gobrick.Device, error)
DisconnectVolumeByDeviceName(ctx context.Context, name string) error
GetInitiatorName(ctx context.Context) ([]string, error)
}
// FcConnector is wrapper of gobrcik.FcConnector interface.
// It allows to connect FC volumes to the node.
type FcConnector interface {
ConnectVolume(ctx context.Context, info gobrick.FCVolumeInfo) (gobrick.Device, error)
DisconnectVolumeByDeviceName(ctx context.Context, name string) error
GetInitiatorPorts(ctx context.Context) ([]string, error)
}
func getNodeOptions() Opts {
var opts Opts
ctx := context.Background()
if path, ok := csictx.LookupEnv(ctx, common.EnvNodeIDFilePath); ok {
opts.NodeIDFilePath = path
}
if kubeConfigPath, ok := csictx.LookupEnv(ctx, common.EnvKubeConfigPath); ok {
opts.KubeConfigPath = kubeConfigPath
}
if prefix, ok := csictx.LookupEnv(ctx, common.EnvNodeNamePrefix); ok {
opts.NodeNamePrefix = prefix
}
if opts.NodeNamePrefix == "" {
opts.NodeNamePrefix = defaultNodeNamePrefix
}
if kubeNodeName, ok := csictx.LookupEnv(ctx, common.EnvKubeNodeName); ok {
opts.KubeNodeName = kubeNodeName
}
if nodeChrootPath, ok := csictx.LookupEnv(ctx, common.EnvNodeChrootPath); ok {
opts.NodeChrootPath = nodeChrootPath
}
if opts.NodeChrootPath == "" {
opts.NodeChrootPath = defaultNodeChrootPath
}
if maxVolumesPerNodeStr, ok := csictx.LookupEnv(ctx, common.EnvMaxVolumesPerNode); ok {
maxVolumesPerNode, err := strconv.ParseInt(maxVolumesPerNodeStr, 10, 64)
if err != nil {
log.Warn("error while parsing the value of maxPowerstoreVolumesPerNode, using default value 0")
opts.MaxVolumesPerNode = 0
} else {
opts.MaxVolumesPerNode = maxVolumesPerNode
}
}
if tmpDir, ok := csictx.LookupEnv(ctx, common.EnvTmpDir); ok {
opts.TmpDir = tmpDir
}
if opts.TmpDir == "" {
opts.TmpDir = defaultTmpDir
}
if fcPortsFilterFilePath, ok := csictx.LookupEnv(ctx, common.EnvFCPortsFilterFilePath); ok {
opts.FCPortsFilterFilePath = fcPortsFilterFilePath
}
// pb parses an environment variable into a boolean value. If an error
// is encountered, default is set to false, and error is logged
pb := func(n string) bool {
if v, ok := csictx.LookupEnv(ctx, n); ok {
b, err := strconv.ParseBool(v)
if err != nil {
log.WithField(n, v).Debug("invalid boolean value. defaulting to false")
return false
}
return b
}
return false
}
opts.EnableCHAP = pb(common.EnvEnableCHAP)
if opts.EnableCHAP {
opts.CHAPUsername = "admin"
opts.CHAPPassword = common.RandomString(12)
}
return opts
}
func formatWWPN(data string) (string, error) {
var buffer bytes.Buffer
for i, v := range data {
_, err := buffer.WriteRune(v)
if err != nil {
return "", err
}
if i%2 != 0 && i < len(data)-1 {
_, err := buffer.WriteString(":")
if err != nil {
return "", err
}
}
}
return buffer.String(), nil
}
// Get preferred outbound ip of this machine
func getOutboundIP(endpoint string, fs fs.Interface) (net.IP, error) {
conn, err := fs.NetDial(endpoint)
if err != nil {
return nil, err
}
defer conn.Close() // #nosec G307
localAddr := conn.LocalAddr().(*net.UDPAddr)
return localAddr.IP, nil
}
func getStagedDev(ctx context.Context, stagePath string, fs fs.Interface) (string, error) {
mountInfo, found, err := getTargetMount(ctx, stagePath, fs)
if err != nil {
return "", status.Errorf(codes.Internal,
"can't check mounts for path %s: %s", stagePath, err.Error())
}
if !found {
return "", nil
}
sourceDev := mountInfo.Device
// for bind mounts
if sourceDev == "devtmpfs" || sourceDev == "udev" {
sourceDev = mountInfo.Source
}
return sourceDev, nil
}
func getStagingPath(ctx context.Context, sp string, volID string) string {
logFields := common.GetLogFields(ctx)
if sp == "" || volID == "" {
return ""
}
stagingPath := path.Join(sp, volID)
log.WithFields(logFields).Infof("staging path is: %s", stagingPath)
return path.Join(sp, volID)
}
func getRemnantTargetMounts(ctx context.Context, target string, fs fs.Interface) ([]gofsutil.Info, bool, error) {
logFields := common.GetLogFields(ctx)
var targetMounts []gofsutil.Info
var found bool
mounts, err := getMounts(ctx, fs)
if err != nil {
log.Error("could not reliably determine existing mount status")
return targetMounts, false, status.Error(codes.Internal, "could not reliably determine existing mount status")
}
for _, mount := range mounts {
if strings.Contains(mount.Path, target) {
targetMounts = append(targetMounts, mount)
log.WithFields(logFields).Infof("matching remnantTargetMount %s target %s", target, mount.Path)
found = true
}
}
return targetMounts, found, nil
}
func getTargetMount(ctx context.Context, target string, fs fs.Interface) (gofsutil.Info, bool, error) {
logFields := common.GetLogFields(ctx)
var targetMount gofsutil.Info
var found bool
mounts, err := getMounts(ctx, fs)
if err != nil {
log.Error("could not reliably determine existing mount status")
return targetMount, false, status.Error(codes.Internal,
"could not reliably determine existing mount status")
}
for _, mount := range mounts {
if mount.Path == target {
targetMount = mount
log.WithFields(logFields).Infof("matching targetMount %s target %s",
target, mount.Path)
found = true
break
}
}
return targetMount, found, nil
}
func getMounts(_ context.Context, fs fs.Interface) ([]gofsutil.Info, error) {
data, err := consistentRead(procMountsPath, procMountsRetries, fs)
if err != nil {
return []gofsutil.Info{}, err
}
info, err := fs.ParseProcMounts(context.Background(), bytes.NewReader(data))
if err != nil {
return []gofsutil.Info{}, err
}
return info, nil
}
func consistentRead(filename string, retry int, fs fs.Interface) ([]byte, error) {
oldContent, err := fs.ReadFile(filepath.Clean(filename))
if err != nil {
return nil, err
}
for i := 0; i < retry; i++ {
newContent, err := fs.ReadFile(filepath.Clean(filename))
if err != nil {
return nil, err
}
if bytes.Compare(oldContent, newContent) == 0 {
log.Infof("successfully read mount file snapshot retry count: %d", i)
return newContent, nil
}
// Files are different, continue reading
oldContent = newContent
}
return nil, fmt.Errorf("could not get consistent content of %s after %d attempts", filename, retry)
}
func createMapping(volID, deviceName, tmpDir string, fs fs.Interface) error {
return fs.WriteFile(path.Join(tmpDir, volID), []byte(deviceName), 0o640)
}
func getMapping(volID, tmpDir string, fs fs.Interface) (string, error) {
data, err := fs.ReadFile(path.Join(tmpDir, volID))
if err != nil {
return "", err
}
if len(data) == 0 {
return "", errors.New("no device name in mapping")
}
return string(data), nil
}
func deleteMapping(volID, tmpDir string, fs fs.Interface) error {
err := fs.Remove(path.Join(tmpDir, volID))
if fs.IsNotExist(err) {
return nil
}
return err
}
func isBlock(cap *csi.VolumeCapability) bool {
_, isBlock := cap.GetAccessType().(*csi.VolumeCapability_Block)
return isBlock
}
func isAlreadyPublished(ctx context.Context, targetPath, rwMode string, fs fs.Interface) (bool, error) {
mount, found, err := getTargetMount(ctx, targetPath, fs)
if err != nil {
return false, status.Errorf(codes.Internal,
"can't check mounts for path %s: %s", targetPath, err.Error())
}
if !found {
return false, nil
}
if !contains(mount.Opts, rwMode) {
return false, status.Errorf(codes.FailedPrecondition,
"volume already mounted but with different capabilities: %s",
mount.Opts)
}
return true, nil
}
func contains(list []string, item string) bool {
for _, x := range list {
if x == item {
return true
}
}
return false
}
func getRWModeString(isRO bool) string {
if isRO {
return "ro"
}
return "rw"
}
func format(_ context.Context, source, fsType string, fs fs.Interface, opts ...string) error {
f := log.Fields{
"source": source,
"fsType": fsType,
"options": opts,
}
// Use 'ext4' as the default
if fsType == "" {
fsType = "ext4"
}
mkfsCmd := fmt.Sprintf("mkfs.%s", fsType)
mkfsArgs := []string{"-E", "nodiscard", "-F", source}
if fsType == "xfs" {
mkfsArgs = []string{"-K", source}
}
mkfsArgs = append(mkfsArgs, opts...)
log.WithFields(f).Infof("formatting with command: %s %v", mkfsCmd, mkfsArgs)
out, err := fs.ExecCommand(mkfsCmd, mkfsArgs...)
if err != nil {
log.WithFields(f).WithError(err).Errorf("formatting disk failed, output: %q", string(out))
return errors.New(string(out))
}
return nil
}
/*
*
* Copyright © 2021-2023 Dell Inc. or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package node
import (
"context"
"errors"
"fmt"
"os"
"regexp"
"strconv"
"github.com/container-storage-interface/spec/lib/go/csi"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func parseSize(size string) (int64, error) {
pattern := `(\d*) ?Gi$`
pathMetadata := regexp.MustCompile(pattern)
matches := pathMetadata.FindStringSubmatch(size)
for i, match := range matches {
if i != 0 {
bytes, err := strconv.ParseInt(match, 10, 64)
if err != nil {
return 0, status.Error(codes.Internal, "Failed to parse size")
}
return bytes * 1073741824, nil
}
}
return 0, errors.New("failed to parse bytes")
}
func (s *Service) ephemeralNodePublish(
ctx context.Context,
req *csi.NodePublishVolumeRequest) (
*csi.NodePublishVolumeResponse, error,
) {
if _, err := s.Fs.Stat(ephemeralStagingMountPath); os.IsNotExist(err) {
log.Debug("path does not exists")
err = s.Fs.MkdirAll(ephemeralStagingMountPath, 0o750)
if err != nil {
log.Errorf("NodestageErrorEph %s", err.Error())
return nil, status.Error(codes.Internal, "Unable to create directory for mounting ephemeral volumes")
}
}
volID := req.VolumeId
volName := fmt.Sprintf("ephemeral-%s", volID)
volSize, err := parseSize(req.VolumeContext["size"])
if err != nil {
log.Errorf("Parse size failed %s", err.Error())
return nil, status.Error(codes.Internal, "inline ephemeral parse size failed")
}
crvolresp, err := s.ctrlSvc.CreateVolume(ctx, &csi.CreateVolumeRequest{
Name: volName,
CapacityRange: &csi.CapacityRange{
RequiredBytes: volSize,
LimitBytes: volSize,
},
VolumeCapabilities: []*csi.VolumeCapability{req.VolumeCapability},
Parameters: req.VolumeContext,
Secrets: req.Secrets,
})
if err != nil {
log.Errorf("CreateVolume Ephemeral %s", err.Error())
return nil, status.Error(codes.Internal, "inline ephemeral create volume failed")
}
errLock := s.Fs.MkdirAll(ephemeralStagingMountPath+volID, 0o750)
if errLock != nil {
return nil, errLock
}
f, errLock := s.Fs.Create(ephemeralStagingMountPath + volID + "/id")
if errLock != nil {
return nil, errLock
}
defer f.Close() //#nosec
_, errLock = s.Fs.WriteString(f, crvolresp.Volume.VolumeId)
if errLock != nil {
return nil, errLock
}
cpubresp, err := s.ctrlSvc.ControllerPublishVolume(ctx, &csi.ControllerPublishVolumeRequest{
VolumeId: crvolresp.Volume.VolumeId,
NodeId: s.nodeID,
VolumeCapability: req.VolumeCapability,
Readonly: req.Readonly,
Secrets: req.Secrets,
VolumeContext: crvolresp.Volume.VolumeContext,
})
if err != nil {
log.Infof("Rolling back and calling unpublish ephemeral volumes with VolId %s", crvolresp.Volume.VolumeId)
_, _ = s.NodeUnpublishVolume(ctx, &csi.NodeUnpublishVolumeRequest{
VolumeId: volID,
TargetPath: req.TargetPath,
})
return nil, status.Error(codes.Internal, "inline ephemeral controller publish failed")
}
_, err = s.NodeStageVolume(ctx, &csi.NodeStageVolumeRequest{
VolumeId: crvolresp.Volume.VolumeId,
PublishContext: cpubresp.PublishContext,
StagingTargetPath: ephemeralStagingMountPath,
VolumeCapability: req.VolumeCapability,
Secrets: req.Secrets,
VolumeContext: crvolresp.Volume.VolumeContext,
})
if err != nil {
log.Errorf("NodeStageErrEph %s", err.Error())
log.Infof("Rolling back and calling unpublish ephemeral volumes with VolId %s", crvolresp.Volume.VolumeId)
_, _ = s.NodeUnpublishVolume(ctx, &csi.NodeUnpublishVolumeRequest{
VolumeId: volID,
TargetPath: req.TargetPath,
})
return nil, status.Error(codes.Internal, "inline ephemeral node stage failed")
}
delete(crvolresp.Volume.VolumeContext, "csi.storage.k8s.io/ephemeral")
_, err = s.NodePublishVolume(ctx, &csi.NodePublishVolumeRequest{
VolumeId: crvolresp.Volume.VolumeId,
PublishContext: cpubresp.PublishContext,
StagingTargetPath: ephemeralStagingMountPath,
TargetPath: req.TargetPath,
VolumeCapability: req.VolumeCapability,
Readonly: req.Readonly,
Secrets: req.Secrets,
VolumeContext: crvolresp.Volume.VolumeContext,
})
if err != nil {
log.Errorf("NodePublishErrEph %s", err.Error())
_, _ = s.NodeUnpublishVolume(ctx, &csi.NodeUnpublishVolumeRequest{
VolumeId: volID,
TargetPath: req.TargetPath,
})
return nil, status.Error(codes.Internal, "inline ephemeral node publish failed")
}
return &csi.NodePublishVolumeResponse{}, nil
}
func (s *Service) ephemeralNodeUnpublish(
ctx context.Context,
req *csi.NodeUnpublishVolumeRequest,
) error {
volID := req.GetVolumeId()
if volID == "" {
return status.Error(codes.InvalidArgument, "volume ID is required")
}
stagingPath := ephemeralStagingMountPath
lockFile := ephemeralStagingMountPath + volID + "/id"
dat, err := s.Fs.ReadFile(lockFile)
if os.IsNotExist(err) {
return status.Error(codes.Internal, "Inline ephemeral. Was unable to read lockfile")
}
goodVolid := string(dat)
_, err = s.NodeUnstageVolume(ctx, &csi.NodeUnstageVolumeRequest{
VolumeId: goodVolid,
StagingTargetPath: stagingPath,
})
if err != nil {
log.Info(err)
return status.Error(codes.Internal, "Inline ephemeral node unstage unpublish failed")
}
log.Info("Calling unpublish")
_, err = s.ctrlSvc.ControllerUnpublishVolume(ctx, &csi.ControllerUnpublishVolumeRequest{
VolumeId: goodVolid,
NodeId: s.nodeID,
})
if err != nil {
return status.Error(codes.Internal, "Inline ephemeral controller unpublish unpublish failed")
}
_, err = s.ctrlSvc.DeleteVolume(ctx, &csi.DeleteVolumeRequest{
VolumeId: goodVolid,
})
if err != nil {
return err
}
err = os.RemoveAll(ephemeralStagingMountPath + volID)
if err != nil {
return status.Error(codes.Internal, "Failed to cleanup lockfiles")
}
return nil
}
/*
*
* Copyright © 2021-2024 Dell Inc. or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package node provides CSI specification compatible node service.
package node
import (
"context"
"errors"
"fmt"
"net/http"
"os"
"path"
"path/filepath"
"regexp"
"strconv"
"strings"
"github.com/dell/gonvme"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/dell/csi-powerstore/v2/pkg/array"
"github.com/dell/csi-powerstore/v2/pkg/common"
"github.com/dell/csi-powerstore/v2/pkg/common/fs"
"github.com/dell/csi-powerstore/v2/pkg/common/k8sutils"
"github.com/dell/csi-powerstore/v2/pkg/controller"
"github.com/dell/gobrick"
csictx "github.com/dell/gocsi/context"
"github.com/dell/gofsutil"
"github.com/dell/goiscsi"
"github.com/dell/gopowerstore"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
// Opts defines service configuration options.
type Opts struct {
NodeIDFilePath string
NodeNamePrefix string
NodeChrootPath string
MaxVolumesPerNode int64
FCPortsFilterFilePath string
KubeNodeName string
KubeConfigPath string
CHAPUsername string
CHAPPassword string
TmpDir string
EnableCHAP bool
}
// Service is a controller service that contains scsi connectors and implements NodeServer API
type Service struct {
Fs fs.Interface
ctrlSvc controller.Interface
iscsiConnector ISCSIConnector
fcConnector FcConnector
nvmeConnector NVMEConnector
iscsiLib goiscsi.ISCSIinterface
nvmeLib gonvme.NVMEinterface
iscsiTargets map[string][]string
nvmeTargets map[string][]string
opts Opts
nodeID string
useFC bool
useNVME bool
useNFS bool
initialized bool
reusedHost bool
isHealthMonitorEnabled bool
isPodmonEnabled bool
array.Locker
}
const (
maxPowerstoreVolumesPerNodeLabel = "max-powerstore-volumes-per-node"
)
// Init initializes node service by parsing environmental variables, connecting it as a host.
// Will init ISCSIConnector, FcConnector and ControllerService if they are nil.
func (s *Service) Init() error {
ctx := context.Background()
s.opts = getNodeOptions()
s.initConnectors()
err := s.updateNodeID()
if err != nil {
return fmt.Errorf("can't update node id: %s", err.Error())
}
s.iscsiTargets = make(map[string][]string)
s.nvmeTargets = make(map[string][]string)
iscsiInitiators, fcInitiators, nvmeInitiators, err := s.getInitiators()
if err != nil {
return fmt.Errorf("can't get initiators of the node: %s", err.Error())
}
if isPodmonEnabled, ok := csictx.LookupEnv(ctx, common.EnvPodmonEnabled); ok {
// in case of any error in reading/parsing the env variable default value will be false
s.isPodmonEnabled, _ = strconv.ParseBool(isPodmonEnabled)
}
if len(iscsiInitiators) == 0 && len(fcInitiators) == 0 && len(nvmeInitiators) == 0 {
s.useNFS = true
go s.startAPIService(ctx)
return nil
}
// Setup host on each of available arrays
for _, arr := range s.Arrays() {
if arr.BlockProtocol == common.NoneTransport {
continue
}
var initiators []string
switch arr.BlockProtocol {
case common.NVMETCPTransport:
if len(nvmeInitiators) == 0 {
log.Errorf("NVMeTCP transport was requested but NVMe initiator is not available")
}
s.useNVME = true
s.useFC = false
case common.NVMEFCTransport:
if len(nvmeInitiators) == 0 {
log.Errorf("NVMeFC transport was requested but NVMe initiator is not available")
}
s.useNVME = true
s.useFC = true
case common.ISCSITransport:
if len(iscsiInitiators) == 0 {
log.Errorf("iSCSI transport was requested but iSCSI initiator is not available")
}
s.useNVME = false
s.useFC = false
case common.FcTransport:
if len(fcInitiators) == 0 {
log.Errorf("FC transport was requested but FC initiator is not available")
}
s.useNVME = false
s.useFC = true
default:
s.useNVME = len(nvmeInitiators) > 0
s.useFC = len(fcInitiators) > 0
}
if s.useNVME {
initiators = nvmeInitiators
if s.useFC {
log.Infof("NVMeFC Protocol is requested")
} else {
log.Infof("NVMeTCP Protocol is requested")
}
} else if s.useFC {
initiators = fcInitiators
log.Infof("FC Protocol is requested")
} else {
initiators = iscsiInitiators
log.Infof("iSCSI Protocol is requested")
}
err = s.setupHost(initiators, arr.GetClient(), arr.GetIP())
if err != nil {
log.Errorf("can't setup host on %s: %s", arr.Endpoint, err.Error())
}
}
if isHealthMonitorEnabled, ok := csictx.LookupEnv(ctx, common.EnvIsHealthMonitorEnabled); ok {
s.isHealthMonitorEnabled, _ = strconv.ParseBool(isHealthMonitorEnabled)
}
go s.startAPIService(ctx)
return nil
}
func (s *Service) initConnectors() {
gobrick.SetLogger(&common.CustomLogger{})
if s.iscsiConnector == nil {
s.iscsiConnector = gobrick.NewISCSIConnector(
gobrick.ISCSIConnectorParams{
Chroot: s.opts.NodeChrootPath,
ChapUser: s.opts.CHAPUsername,
ChapPassword: s.opts.CHAPPassword,
ChapEnabled: s.opts.EnableCHAP,
})
}
if s.fcConnector == nil {
s.fcConnector = gobrick.NewFCConnector(
gobrick.FCConnectorParams{Chroot: s.opts.NodeChrootPath})
}
if s.nvmeConnector == nil {
s.nvmeConnector = gobrick.NewNVMeConnector(
gobrick.NVMeConnectorParams{Chroot: s.opts.NodeChrootPath})
}
if s.ctrlSvc == nil {
svc := &controller.Service{Fs: s.Fs}
svc.SetArrays(s.Arrays())
svc.SetDefaultArray(s.DefaultArray())
s.ctrlSvc = svc
}
if s.iscsiLib == nil {
iSCSIOpts := make(map[string]string)
iSCSIOpts["chrootDirectory"] = s.opts.NodeChrootPath
s.iscsiLib = goiscsi.NewLinuxISCSI(iSCSIOpts)
}
if s.nvmeLib == nil {
NVMeOpts := make(map[string]string)
NVMeOpts["chrootDirectory"] = s.opts.NodeChrootPath
s.nvmeLib = gonvme.NewNVMe(NVMeOpts)
}
}
// NodeStageVolume prepares volume to be consumed by node publish by connecting volume to the node
func (s *Service) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
logFields := common.GetLogFields(ctx)
if req.GetVolumeCapability() == nil {
return nil, status.Error(codes.InvalidArgument, "volume capability is required")
}
id := req.GetVolumeId()
if id == "" {
return nil, status.Error(codes.InvalidArgument, "volume ID is required")
}
if req.GetStagingTargetPath() == "" {
return nil, status.Error(codes.InvalidArgument, "staging target path is required")
}
id, arrayID, protocol, _, _, _ := array.ParseVolumeID(ctx, id, s.DefaultArray(), req.VolumeCapability)
var stager VolumeStager
arr, ok := s.Arrays()[arrayID]
if !ok {
return nil, status.Errorf(codes.Internal, "can't find array with provided arrayID %s", arrayID)
}
if protocol == "nfs" {
stager = &NFSStager{
array: arr,
}
} else {
stager = &SCSIStager{
useFC: s.useFC,
useNVME: s.useNVME,
iscsiConnector: s.iscsiConnector,
nvmeConnector: s.nvmeConnector,
fcConnector: s.fcConnector,
}
}
return stager.Stage(ctx, req, logFields, s.Fs, id)
}
// NodeUnstageVolume reverses steps done in NodeStage by disconnecting volume from the node
func (s *Service) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
logFields := common.GetLogFields(ctx)
var err error
id := req.GetVolumeId()
if id == "" {
return nil, status.Error(codes.InvalidArgument, "volume ID is required")
}
if req.GetStagingTargetPath() == "" {
return nil, status.Error(codes.InvalidArgument, "staging target path is required")
}
id, _, protocol, _, _, err := array.ParseVolumeID(ctx, id, s.DefaultArray(), nil)
if err != nil {
if apiError, ok := err.(gopowerstore.APIError); ok && apiError.NotFound() {
return &csi.NodeUnstageVolumeResponse{}, nil
}
return nil, status.Errorf(codes.Unknown,
"failure checking volume status for volume node unstage: %s",
err.Error())
}
// append additional path to be able to do bind mounts
stagingPath := getStagingPath(ctx, req.GetStagingTargetPath(), id)
device, err := unstageVolume(ctx, stagingPath, id, logFields, err, s.Fs)
if err != nil {
return nil, err
}
if protocol == "nfs" {
return &csi.NodeUnstageVolumeResponse{}, nil
}
if device != "" {
err := createMapping(id, device, s.opts.TmpDir, s.Fs)
if err != nil {
log.WithFields(logFields).Warningf("failed to create vol to device mapping: %s", err.Error())
}
} else {
device, err = getMapping(id, s.opts.TmpDir, s.Fs)
if err != nil {
log.WithFields(logFields).Info("no device found. skip device removal")
return &csi.NodeUnstageVolumeResponse{}, nil
}
}
f := log.Fields{"Device": device}
connectorCtx := common.SetLogFields(context.Background(), logFields)
if s.useNVME {
err = s.nvmeConnector.DisconnectVolumeByDeviceName(connectorCtx, device)
} else if s.useFC {
err = s.fcConnector.DisconnectVolumeByDeviceName(connectorCtx, device)
} else {
err = s.iscsiConnector.DisconnectVolumeByDeviceName(connectorCtx, device)
}
if err != nil {
log.WithFields(logFields).Error(err)
return nil, err
}
log.WithFields(logFields).WithFields(f).Info("block device removal complete")
err = deleteMapping(id, s.opts.TmpDir, s.Fs)
if err != nil {
log.WithFields(logFields).Warningf("failed to remove vol to Dev mapping: %s", err.Error())
}
return &csi.NodeUnstageVolumeResponse{}, nil
}
func unstageVolume(ctx context.Context, stagingPath, id string, logFields log.Fields, err error, fs fs.Interface) (string, error) {
logFields["ID"] = id
logFields["StagingPath"] = stagingPath
ctx = common.SetLogFields(ctx, logFields)
log.WithFields(logFields).Info("calling unstage")
device, err := getStagedDev(ctx, stagingPath, fs)
if err != nil {
return "", status.Errorf(codes.Internal,
"could not reliably determine existing mount for path %s: %s", stagingPath, err.Error())
}
if device != "" {
_, device = path.Split(device)
log.WithFields(logFields).Infof("active mount exist")
err = fs.GetUtil().Unmount(ctx, stagingPath)
if err != nil {
return "", status.Errorf(codes.Internal,
"could not unmount dev %s: %s", device, err.Error())
}
log.WithFields(logFields).Infof("unmount without error")
} else {
// no mounts
log.WithFields(logFields).Infof("no mounts found")
}
err = fs.Remove(stagingPath)
if err != nil && fs.IsDeviceOrResourceBusy(err) {
log.Warnf("failed to delete mount path : %s", err)
var remnantDevice string
remnantDevice, err = removeRemnantMounts(ctx, stagingPath, fs, logFields)
if device == "" {
device = remnantDevice
}
}
if err != nil && !fs.IsNotExist(err) {
return "", status.Errorf(codes.Internal, "failed to delete mount path %s: %s", stagingPath, err.Error())
}
log.WithFields(logFields).Infof("target mount file deleted")
return device, nil
}
func removeRemnantMounts(ctx context.Context, stagingPath string, fs fs.Interface, logFields log.Fields) (string, error) {
log.WithFields(logFields).Infof("getting remnant mount")
mounts, found, err := getRemnantTargetMounts(ctx, stagingPath, fs)
if !found || err != nil {
return "", fmt.Errorf("could not reliably determine remnant mounts for path %s: %s", stagingPath, err.Error())
}
log.WithFields(logFields).Infof("%d remnant mount exist", len(mounts))
for _, mount := range mounts {
delete(logFields, "StagingPath")
logFields["RemnantPath"] = mount.Path
err = fs.GetUtil().Unmount(ctx, mount.Path)
if err != nil {
return "", fmt.Errorf("could not unmount dev %s: %s", mount.Path, err.Error())
}
log.WithFields(logFields).Infof("unmount without error")
}
delete(logFields, "RemnantPath")
logFields["StagingPath"] = stagingPath
err = fs.Remove(stagingPath)
return mounts[0].Device, err
}
// NodePublishVolume publishes volume to the node by mounting it to the target path
func (s *Service) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
logFields := common.GetLogFields(ctx)
var ephemeralVolume bool
ephemeral, ok := req.VolumeContext["csi.storage.k8s.io/ephemeral"]
if ok {
ephemeralVolume = strings.ToLower(ephemeral) == "true"
}
if ephemeralVolume {
return s.ephemeralNodePublish(ctx, req)
}
// Get the VolumeID and validate against the volume
id := req.GetVolumeId()
if id == "" {
return nil, status.Error(codes.InvalidArgument, "volume ID is required")
}
targetPath := req.GetTargetPath()
if targetPath == "" {
return nil, status.Error(codes.InvalidArgument, "targetPath is required")
}
if req.GetVolumeCapability() == nil {
return nil, status.Error(codes.InvalidArgument, "VolumeCapability is required")
}
if req.GetStagingTargetPath() == "" {
return nil, status.Error(codes.InvalidArgument, "stagingPath is required")
}
id, _, protocol, _, _, _ := array.ParseVolumeID(ctx, id, s.DefaultArray(), req.VolumeCapability)
// append additional path to be able to do bind mounts
stagingPath := getStagingPath(ctx, req.GetStagingTargetPath(), id)
isRO := req.GetReadonly()
volumeCapability := req.GetVolumeCapability()
logFields["ID"] = id
logFields["TargetPath"] = targetPath
logFields["StagingPath"] = stagingPath
logFields["ReadOnly"] = req.GetReadonly()
ctx = common.SetLogFields(ctx, logFields)
log.WithFields(logFields).Info("calling publish")
var publisher VolumePublisher
if protocol == "nfs" {
if s.fileExists(filepath.Join(stagingPath, commonNfsVolumeFolder)) {
// Assume root squashing is enabled
stagingPath = filepath.Join(stagingPath, commonNfsVolumeFolder)
}
publisher = &NFSPublisher{}
} else {
publisher = &SCSIPublisher{
isBlock: isBlock(req.VolumeCapability),
}
}
return publisher.Publish(ctx, logFields, s.Fs, volumeCapability, isRO, targetPath, stagingPath)
}
// NodeUnpublishVolume unpublishes volume from the node by unmounting it from the target path
func (s *Service) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
logFields := common.GetLogFields(ctx)
var err error
targetPath := req.GetTargetPath()
if targetPath == "" {
log.Error("target path required")
return nil, status.Error(codes.InvalidArgument, "target path required")
}
volID := req.GetVolumeId()
if volID == "" {
return nil, status.Error(codes.InvalidArgument, "volume ID is required")
}
var ephemeralVolume bool
lockFile := ephemeralStagingMountPath + volID + "/id"
if s.fileExists(lockFile) {
ephemeralVolume = true
}
logFields["ID"] = volID
logFields["TargetPath"] = targetPath
ctx = common.SetLogFields(ctx, logFields)
log.WithFields(logFields).Info("calling unpublish")
_, found, err := getTargetMount(ctx, targetPath, s.Fs)
if err != nil {
return nil, status.Errorf(codes.Internal,
"could not reliably determine existing mount status for path %s: %s",
targetPath, err.Error())
}
if !found {
// no mounts
log.WithFields(logFields).Infof("no mounts found")
return &csi.NodeUnpublishVolumeResponse{}, nil
}
log.WithFields(logFields).Infof("active mount exist")
err = s.Fs.GetUtil().Unmount(ctx, targetPath)
if err != nil {
return nil, status.Errorf(codes.Internal,
"could not unmount dev %s: %s",
targetPath, err.Error())
}
log.WithFields(logFields).Info("unpublish complete")
log.Debug("Checking for ephemeral after node unpublish")
if ephemeralVolume {
log.Info("Detected ephemeral")
err = s.ephemeralNodeUnpublish(ctx, req)
if err != nil {
return nil, err
}
}
return &csi.NodeUnpublishVolumeResponse{}, nil
}
// NodeGetVolumeStats returns volume usage stats
func (s *Service) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
volumeID := req.GetVolumeId()
if len(volumeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "no volume ID provided")
}
volumePath := req.GetVolumePath()
if len(volumePath) == 0 {
return nil, status.Error(codes.InvalidArgument, "no volume Path provided")
}
// parse volume Id
id, arrayID, protocol, _, _, err := array.ParseVolumeID(ctx, volumeID, s.DefaultArray(), nil)
if err != nil {
if apiError, ok := err.(gopowerstore.APIError); ok && apiError.NotFound() {
return nil, err
}
return nil, err
}
arr, ok := s.Arrays()[arrayID]
if !ok {
return nil, status.Error(codes.InvalidArgument, "failed to find array with given ID")
}
// Validate if volume exists
if protocol == "nfs" {
fs, err := arr.Client.GetFS(ctx, id)
if err != nil {
if apiError, ok := err.(gopowerstore.APIError); !ok || !apiError.NotFound() {
return nil, status.Errorf(codes.NotFound, "failed to find filesystem %s with error: %v", id, err.Error())
}
resp := &csi.NodeGetVolumeStatsResponse{
VolumeCondition: &csi.VolumeCondition{
Abnormal: true,
Message: fmt.Sprintf("Filesystem %s is not found", id),
},
}
return resp, nil
}
nfsExport, err := s.Arrays()[arrayID].Client.GetNFSExportByFileSystemID(ctx, fs.ID)
if err != nil {
if apiError, ok := err.(gopowerstore.APIError); !ok || !apiError.NotFound() {
return nil, status.Errorf(codes.NotFound, "failed to find nfs export for filesystem with error: %v", err.Error())
}
resp := &csi.NodeGetVolumeStatsResponse{
VolumeCondition: &csi.VolumeCondition{
Abnormal: true,
Message: fmt.Sprintf("NFS export for volume %s is not found", id),
},
}
return resp, nil
}
// get hosts publish to export
hosts := append(nfsExport.ROHosts, nfsExport.RORootHosts...)
hosts = append(hosts, nfsExport.RWHosts...)
hosts = append(hosts, nfsExport.RWRootHosts...)
attached := false
for _, host := range hosts {
if s.nodeID == host {
attached = true
}
}
if !attached {
resp := &csi.NodeGetVolumeStatsResponse{
VolumeCondition: &csi.VolumeCondition{
Abnormal: true,
Message: fmt.Sprintf("host %s is not attached to NFS export for filesystem %s", s.nodeID, id),
},
}
return resp, nil
}
} else {
_, err := arr.Client.GetVolume(ctx, id)
if err != nil {
if apiError, ok := err.(gopowerstore.APIError); !ok || !apiError.NotFound() {
return nil, status.Errorf(codes.NotFound, "failed to find volume %s with error: %v", id, err.Error())
}
resp := &csi.NodeGetVolumeStatsResponse{
VolumeCondition: &csi.VolumeCondition{
Abnormal: true,
Message: fmt.Sprintf("Volume %s is not found", id),
},
}
return resp, nil
}
// get hosts published to volume
hostMappings, err := s.Arrays()[arrayID].Client.GetHostVolumeMappingByVolumeID(ctx, id)
if err != nil {
return nil, status.Errorf(codes.NotFound, "failed to get host volume mapping for volume: %s with error: %v", id, err.Error())
}
hostMapped := false
for _, hostMapping := range hostMappings {
host, err := s.Arrays()[arrayID].Client.GetHost(ctx, hostMapping.HostID)
if err != nil {
if apiError, ok := err.(gopowerstore.APIError); !ok || !apiError.NotFound() {
return nil, status.Errorf(codes.NotFound, "failed to get host: %s with error: %v", hostMapping.HostID, err.Error())
}
resp := &csi.NodeGetVolumeStatsResponse{
VolumeCondition: &csi.VolumeCondition{
Abnormal: true,
Message: fmt.Sprintf("host %s is not attached to volume %s", s.nodeID, id),
},
}
return resp, nil
}
if host.Name == s.nodeID {
hostMapped = true
}
iscsiConnection := false
for _, initiator := range host.Initiators {
if len(initiator.ActiveSessions) > 0 {
iscsiConnection = true
}
}
if !iscsiConnection {
resp := &csi.NodeGetVolumeStatsResponse{
VolumeCondition: &csi.VolumeCondition{
Abnormal: true,
Message: fmt.Sprintf("host %s has no active initiator connection", s.nodeID),
},
}
return resp, nil
}
}
if !hostMapped {
resp := &csi.NodeGetVolumeStatsResponse{
VolumeCondition: &csi.VolumeCondition{
Abnormal: true,
Message: fmt.Sprintf("host %s is not attached to volume %s", s.nodeID, id),
},
}
return resp, nil
}
}
stagingPath := req.GetStagingTargetPath()
if len(stagingPath) != 0 {
// Check if staging target path is mounted
_, found, err := getTargetMount(ctx, stagingPath, s.Fs)
if err != nil {
return nil, status.Errorf(codes.Internal, "can't check mounts for path %s: %s", volumePath, err.Error())
}
if !found {
resp := &csi.NodeGetVolumeStatsResponse{
VolumeCondition: &csi.VolumeCondition{
Abnormal: true,
Message: fmt.Sprintf("staging target path %s not mounted for volume %s", stagingPath, id),
},
}
return resp, nil
}
}
// Check if target path is mounted
_, found, err := getTargetMount(ctx, volumePath, s.Fs)
if err != nil {
return nil, status.Errorf(codes.Internal, "can't check mounts for path %s: %s", volumePath, err.Error())
}
if !found {
resp := &csi.NodeGetVolumeStatsResponse{
VolumeCondition: &csi.VolumeCondition{
Abnormal: true,
Message: fmt.Sprintf("volume path %s not mounted for volume %s", volumePath, id),
},
}
return resp, nil
}
// check if volume path is accessible
_, err = os.ReadDir(volumePath)
if err != nil {
resp := &csi.NodeGetVolumeStatsResponse{
VolumeCondition: &csi.VolumeCondition{
Abnormal: true,
Message: fmt.Sprintf("volume path %s not accessible for volume %s", volumePath, id),
},
}
return resp, nil
}
// get volume metrics for mounted volume path
availableBytes, totalBytes, usedBytes, totalInodes, freeInodes, usedInodes, err := gofsutil.FsInfo(ctx, volumePath)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to get metrics for volume with error: %v", err)
}
resp := &csi.NodeGetVolumeStatsResponse{
Usage: []*csi.VolumeUsage{
{
Available: availableBytes,
Total: totalBytes,
Used: usedBytes,
Unit: csi.VolumeUsage_BYTES,
},
{
Available: freeInodes,
Total: totalInodes,
Used: usedInodes,
Unit: csi.VolumeUsage_INODES,
},
},
VolumeCondition: &csi.VolumeCondition{
Abnormal: false,
Message: "",
},
}
return resp, nil
}
// NodeExpandVolume expands the volume by re-scanning and resizes filesystem if needed
func (s *Service) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
var reqID string
var err error
headers, ok := metadata.FromIncomingContext(ctx)
if ok {
if req, ok := headers["csi.requestid"]; ok && len(req) > 0 {
reqID = req[0]
}
}
// Get the VolumeID and validate against the volume
id, arrayID, _, _, _, err := array.ParseVolumeID(ctx, req.VolumeId, s.DefaultArray(), nil)
if err != nil {
if apiError, ok := err.(gopowerstore.APIError); ok && apiError.NotFound() {
return nil, err
}
return nil, err
}
arr, ok := s.Arrays()[arrayID]
if !ok {
return nil, status.Error(codes.InvalidArgument, "failed to find array with given ID")
}
targetPath := req.GetVolumePath()
if targetPath == "" {
return nil, status.Error(codes.InvalidArgument, "targetPath is required")
}
isBlock := strings.Contains(targetPath, blockVolumePathMarker)
// Parse the CSI VolumeId and validate against the volume
vol, err := arr.Client.GetVolume(ctx, id)
if err != nil {
// If the volume isn't found, we cannot stage it
return nil, status.Error(codes.NotFound, "Volume not found")
}
volumeWWN := vol.Wwn
// Locate and fetch all (multipath/regular) mounted paths using this volume
var devMnt *gofsutil.DeviceMountInfo
var targetmount string
devMnt, err = s.Fs.GetUtil().GetMountInfoFromDevice(ctx, vol.Name)
if err != nil {
if isBlock {
return s.nodeExpandRawBlockVolume(ctx, volumeWWN)
}
log.Infof("Failed to find mount info for (%s) with error (%s)", vol.Name, err.Error())
log.Info("Probably offline volume expansion. Will try to perform a temporary mount.")
var disklocation string
disklocation = fmt.Sprintf("%s/%s", targetPath, vol.ID)
log.Infof("DisklLocation: %s", disklocation)
targetmount = fmt.Sprintf("tmp/%s/%s", vol.ID, vol.Name)
log.Infof("TargetMount: %s", targetmount)
err = s.Fs.MkdirAll(targetmount, 0o750)
if err != nil {
return nil, status.Error(codes.Internal,
fmt.Sprintf("Failed to find mount info for (%s) with error (%s)", vol.Name, err.Error()))
}
err = s.Fs.GetUtil().Mount(ctx, disklocation, targetmount, "")
if err != nil {
return nil, status.Error(codes.Internal,
fmt.Sprintf("Failed to find mount info for (%s) with error (%s)", vol.Name, err.Error()))
}
defer func() {
if targetmount != "" {
log.Infof("Clearing down temporary mount points in: %s", targetmount)
err := s.Fs.GetUtil().Unmount(ctx, targetmount)
if err != nil {
log.Error("Failed to remove temporary mount points")
}
err = s.Fs.RemoveAll(targetmount)
if err != nil {
log.Error("Failed to remove temporary mount points")
}
}
}()
devMnt, err = s.Fs.GetUtil().GetMountInfoFromDevice(ctx, vol.Name)
if err != nil {
return nil, status.Error(codes.Internal,
fmt.Sprintf("Failed to find mount info for (%s) with error (%s)", vol.Name, err.Error()))
}
}
log.Infof("Mount info for volume %s: %+v", vol.Name, devMnt)
size := req.GetCapacityRange().GetRequiredBytes()
f := log.Fields{
"CSIRequestID": reqID,
"VolumeName": vol.Name,
"VolumePath": targetPath,
"Size": size,
"VolumeWWN": volumeWWN,
}
log.WithFields(f).Info("Calling resize the file system")
if !s.useNVME {
// Rescan the device for the volume expanded on the array
for _, device := range devMnt.DeviceNames {
devicePath := sysBlock + device
err = s.Fs.GetUtil().DeviceRescan(context.Background(), devicePath)
if err != nil {
log.Errorf("Failed to rescan device (%s) with error (%s)", devicePath, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
}
}
// Expand the filesystem with the actual expanded volume size.
if devMnt.MPathName != "" {
err = s.Fs.GetUtil().ResizeMultipath(context.Background(), devMnt.MPathName)
if err != nil {
log.Errorf("Failed to resize filesystem: device (%s) with error (%s)", devMnt.MountPoint, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
}
// For a regular device, get the device path (devMnt.DeviceNames[1]) where the filesystem is mounted
// PublishVolume creates devMnt.DeviceNames[0] but is left unused for regular devices
var devicePath string
if len(devMnt.DeviceNames) > 1 {
devicePath = "/dev/" + devMnt.DeviceNames[1]
} else if len(devMnt.DeviceNames) == 0 {
return nil, status.Error(codes.Internal,
fmt.Sprintf("Failed to find mount info for (%s) DeviceNames (%v)", vol.Name, devMnt.DeviceNames))
} else {
devicePath = "/dev/" + devMnt.DeviceNames[0]
}
fsType, err := s.Fs.GetUtil().FindFSType(context.Background(), devMnt.MountPoint)
if err != nil {
log.Errorf("Failed to fetch filesystem for volume (%s) with error (%s)", devMnt.MountPoint, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
log.Infof("Found %s filesystem mounted on volume %s", fsType, devMnt.MountPoint)
// Resize the filesystem
var xfsNew bool
checkVersCmd := "xfs_growfs -V"
bufcheck, errcheck := s.Fs.ExecCommandOutput("bash", "-c", checkVersCmd)
if errcheck != nil {
return nil, errcheck
}
outputcheck := string(bufcheck)
versionRegx := regexp.MustCompile(`version (?P<versmaj>\d+)\.(?P<versmin>\d+)\..+`)
match := versionRegx.FindStringSubmatch(outputcheck)
subMatchMap := make(map[string]string)
for i, name := range versionRegx.SubexpNames() {
if i != 0 {
subMatchMap[name] = match[i]
}
}
if s, err := strconv.ParseFloat(subMatchMap["versmaj"]+"."+subMatchMap["versmin"], 64); err == nil {
fmt.Println(s)
if s >= 5.0 { // need to check exact version
xfsNew = true
} else {
xfsNew = false
}
} else {
return nil, status.Error(codes.Internal, err.Error())
}
if fsType == "xfs" && xfsNew {
// Passing empty string for ppathDevice since we don't need the powerpath device
err = s.Fs.GetUtil().ResizeFS(context.Background(), devMnt.MountPoint, devicePath, "", "", fsType)
if err != nil {
log.Errorf("Failed to resize filesystem: mountpoint (%s) device (%s) with error (%s)",
devMnt.MountPoint, devicePath, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
} else {
// Passing empty string for ppathDevice since we don't need the powerpath device
err = s.Fs.GetUtil().ResizeFS(context.Background(), devMnt.MountPoint, devicePath, "", devMnt.MPathName, fsType)
if err != nil {
log.Errorf("Failed to resize filesystem: mountpoint (%s) device (%s) with error (%s)",
devMnt.MountPoint, devicePath, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
}
return &csi.NodeExpandVolumeResponse{}, nil
}
func (s *Service) nodeExpandRawBlockVolume(ctx context.Context, volumeWWN string) (*csi.NodeExpandVolumeResponse, error) {
log.Info(" Block volume expansion. Will try to perform a rescan...")
wwnNum := strings.Replace(volumeWWN, "naa.", "", 1)
deviceNames, err := s.Fs.GetUtil().GetSysBlockDevicesForVolumeWWN(context.Background(), wwnNum)
if err != nil {
log.Errorf("Failed to get block devices with error (%s)", err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
if len(deviceNames) > 0 {
var devName string
for _, deviceName := range deviceNames {
devicePath := sysBlock + deviceName
log.Infof("Rescanning unmounted (raw block) device %s to expand size", deviceName)
err = s.Fs.GetUtil().DeviceRescan(context.Background(), devicePath)
if err != nil {
log.Errorf("Failed to rescan device (%s) with error (%s)", devicePath, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
devName = deviceName
}
mpathDev, err := s.Fs.GetUtil().GetMpathNameFromDevice(ctx, devName)
fmt.Println("mpathDev: " + mpathDev)
if err != nil {
log.Errorf("Failed to get mpath name for device (%s) with error (%s)", devName, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
if mpathDev != "" {
err = s.Fs.GetUtil().ResizeMultipath(context.Background(), mpathDev)
if err != nil {
log.Errorf("Failed to resize multipath of block device (%s) with error (%s)", mpathDev, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
}
log.Info("Block volume successfuly rescaned.")
return &csi.NodeExpandVolumeResponse{}, nil
}
log.Error("No raw block devices found")
return nil, status.Error(codes.NotFound, "No raw block devices found")
}
// NodeGetCapabilities returns supported features by the node service
func (s *Service) NodeGetCapabilities(_ context.Context, _ *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
newCap := func(cap csi.NodeServiceCapability_RPC_Type) *csi.NodeServiceCapability {
return &csi.NodeServiceCapability{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Type: cap,
},
},
}
}
var capabilities []*csi.NodeServiceCapability
for _, capability := range []csi.NodeServiceCapability_RPC_Type{
csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
csi.NodeServiceCapability_RPC_EXPAND_VOLUME,
csi.NodeServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER,
} {
capabilities = append(capabilities, newCap(capability))
}
if s.isHealthMonitorEnabled {
for _, capability := range []csi.NodeServiceCapability_RPC_Type{
csi.NodeServiceCapability_RPC_GET_VOLUME_STATS,
csi.NodeServiceCapability_RPC_VOLUME_CONDITION,
} {
capabilities = append(capabilities, newCap(capability))
}
}
return &csi.NodeGetCapabilitiesResponse{
Capabilities: capabilities,
}, nil
}
// NodeGetInfo returns id of the node and topology constraints
func (s *Service) NodeGetInfo(ctx context.Context, _ *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
// Create the topology keys
// <driver name>/<endpoint>-<protocol>: true
resp := &csi.NodeGetInfoResponse{
NodeId: s.nodeID,
AccessibleTopology: &csi.Topology{
Segments: map[string]string{},
},
}
for _, arr := range s.Arrays() {
_, err := getOutboundIP(arr.GetIP(), s.Fs)
if err == nil {
resp.AccessibleTopology.Segments[common.Name+"/"+arr.GetIP()+"-nfs"] = "true"
}
if arr.BlockProtocol != common.NoneTransport {
if s.useNVME {
if s.useFC {
nvmefcInfo, err := common.GetNVMEFCTargetInfoFromStorage(arr.GetClient(), "")
if err != nil {
log.Errorf("couldn't get targets from the array: %s", err.Error())
continue
}
log.Infof("Discovering NVMeFC targets")
nvmefcConnectCount := 0
for _, info := range nvmefcInfo {
NVMeFCTargets, err := s.nvmeLib.DiscoverNVMeFCTargets(info.Portal, false)
if err != nil {
log.Errorf("couldn't discover NVMeFC targets")
continue
}
for _, target := range NVMeFCTargets {
err = s.nvmeLib.NVMeFCConnect(target, false)
if err != nil {
log.Errorf("couldn't connect to NVMeFC target")
} else {
nvmefcConnectCount = nvmefcConnectCount + 1
otherTargets := s.nvmeTargets[arr.GlobalID]
s.nvmeTargets[arr.GlobalID] = append(otherTargets, target.TargetNqn)
}
}
}
if nvmefcConnectCount != 0 {
resp.AccessibleTopology.Segments[common.Name+"/"+arr.GetIP()+"-nvmefc"] = "true"
}
} else {
// useNVME/TCP
infoList, err := common.GetNVMETCPTargetsInfoFromStorage(arr.GetClient(), "")
if err != nil {
log.Errorf("couldn't get targets from array: %s", err.Error())
continue
}
var nvmeTargets []gonvme.NVMeTarget
for _, address := range infoList {
// doesn't matter how many portals are present, discovering from any one will list out all targets
nvmeIP := strings.Split(address.Portal, ":")
log.Info("Trying to discover NVMe target from portal ", nvmeIP[0])
nvmeTargets, err = s.nvmeLib.DiscoverNVMeTCPTargets(nvmeIP[0], false)
if err != nil {
log.Error("couldn't discover targets")
continue
}
break
}
loginToAtleastOneTarget := false
for _, target := range nvmeTargets {
log.Info("Logging to NVMe target ", target)
err = s.nvmeLib.NVMeTCPConnect(target, false)
if err != nil {
log.Errorf("couldn't connect to the nvme target")
continue
}
otherTargets := s.nvmeTargets[arr.GlobalID]
s.nvmeTargets[arr.GlobalID] = append(otherTargets, target.TargetNqn)
loginToAtleastOneTarget = true
}
if loginToAtleastOneTarget {
resp.AccessibleTopology.Segments[common.Name+"/"+arr.GetIP()+"-nvmetcp"] = "true"
} else {
s.useNFS = true
}
}
} else if s.useFC {
// Check node initiators connection to array
nodeID := s.nodeID
if s.reusedHost {
ipList := common.GetIPListFromString(nodeID)
if ipList == nil || len(ipList) == 0 {
log.Errorf("can't find ip in nodeID %s", nodeID)
continue
}
ip := ipList[len(ipList)-1]
nodeID = nodeID[:len(nodeID)-len(ip)-1]
}
host, err := arr.GetClient().GetHostByName(ctx, nodeID)
if err != nil {
log.WithFields(log.Fields{
"hostName": nodeID,
"error": err,
}).Error("could not find host on PowerStore array")
continue
}
if len(host.Initiators) == 0 {
log.Error("host initiators array is empty")
continue
}
if len(host.Initiators[0].ActiveSessions) != 0 {
resp.AccessibleTopology.Segments[common.Name+"/"+arr.GetIP()+"-fc"] = "true"
} else {
log.WithFields(log.Fields{
"hostName": host.Name,
"initiator": host.Initiators[0].PortName,
}).Error("there is no active FC sessions")
continue
}
} else {
infoList, err := common.GetISCSITargetsInfoFromStorage(arr.GetClient(), "")
if err != nil {
log.Errorf("couldn't get targets from array: %s", err.Error())
continue
}
var iscsiTargets []goiscsi.ISCSITarget
for _, address := range infoList {
// first check if this portal is reachable from this machine or not
if ReachableEndPoint(address.Portal) {
// doesn't matter how many portals are present, discovering from any one will list out all targets
log.Info("Trying to discover iSCSI target from portal ", address.Portal)
iscsiTargets, err = s.iscsiLib.DiscoverTargets(address.Portal, false)
if err != nil {
log.Error("couldn't discover targets")
continue
}
break
}
log.Debugf("Portal %s is not rechable from the node", address.Portal)
}
// login is also performed as a part of ConnectVolume by using dynamically created chap credentials, In case if it fails here
if len(iscsiTargets) > 0 {
resp.AccessibleTopology.Segments[common.Name+"/"+arr.GetIP()+"-iscsi"] = "true"
}
loginToAtleastOneTarget := false
for _, target := range iscsiTargets {
if ReachableEndPoint(target.Portal) {
log.Info("Logging to Iscsi target ", target)
if s.opts.EnableCHAP {
log.Debug("Setting CHAP Credentials before login")
err = s.iscsiLib.SetCHAPCredentials(target, s.opts.CHAPUsername, s.opts.CHAPPassword)
if err != nil {
log.Errorf("couldn't connect to the iscsi target")
}
}
err = s.iscsiLib.PerformLogin(target)
if err != nil {
log.Errorf("couldn't connect to the iscsi target")
continue
}
otherTargets := s.iscsiTargets[arr.GlobalID]
s.iscsiTargets[arr.GlobalID] = append(otherTargets, target.Target)
loginToAtleastOneTarget = true
} else {
log.Debugf("Target's Portal %s is not rechable from the node ", target.Portal)
}
}
if !loginToAtleastOneTarget {
s.useNFS = true
}
}
}
}
var maxVolumesPerNode int64
// Setting maxVolumesPerNode using the value of field maxPowerstoreVolumesPerNode specified in values.yaml
if s.opts.MaxVolumesPerNode > 0 {
maxVolumesPerNode = s.opts.MaxVolumesPerNode
}
// Check for node label 'max-powerstore-volumes-per-node'. If present set 'maxVolumesPerNode' to this value.
labels, err := k8sutils.GetNodeLabels(ctx, s.opts.KubeConfigPath, s.opts.KubeNodeName)
if err != nil {
log.Warnf("failed to get Node Labels with error: %s", err.Error())
} else if labels != nil {
if val, ok := labels[maxPowerstoreVolumesPerNodeLabel]; ok {
maxVols, err := strconv.ParseInt(val, 10, 64)
if err != nil {
log.Warnf("invalid value '%s' specified for 'max-powerstore-volumes-per-node' node label", val)
} else if maxVols > 0 {
maxVolumesPerNode = maxVols
log.Infof("node label 'max-powerstore-volumes-per-node' is available and is set to value '%d'", maxVolumesPerNode)
}
}
}
if maxVolumesPerNode >= 0 {
resp.MaxVolumesPerNode = maxVolumesPerNode
log.Infof("Setting MaxVolumesPerNode to '%d'", maxVolumesPerNode)
}
return resp, nil
}
func (s *Service) updateNodeID() error {
if s.nodeID == "" {
hostID, err := s.Fs.ReadFile(s.opts.NodeIDFilePath)
if err != nil {
log.WithFields(log.Fields{
"path": s.opts.NodeIDFilePath,
"error": err,
}).Error("Could not read Node ID file")
return status.Errorf(codes.FailedPrecondition, "Could not readNode ID file: %s", err.Error())
}
// Check connection to array and get ip
defaultArray := s.DefaultArray()
if defaultArray == nil {
return status.Errorf(codes.FailedPrecondition, "Could not fetch default PowerStore array")
}
ip, err := getOutboundIP(s.DefaultArray().GetIP(), s.Fs)
if err != nil {
log.WithFields(log.Fields{
"endpoint": s.DefaultArray().GetIP(),
"error": err,
}).Error("Could not connect to PowerStore array")
return status.Errorf(codes.FailedPrecondition, "Could not connect to PowerStore array: %s", err.Error())
}
nodeID := fmt.Sprintf(
"%s-%s-%s", s.opts.NodeNamePrefix, strings.TrimSpace(string(hostID)), ip.String(),
)
if len(nodeID) > powerStoreMaxNodeNameLength {
err := errors.New("node name prefix is too long")
log.WithFields(log.Fields{
"value": s.opts.NodeNamePrefix,
"error": err,
}).Error("Invalid Node ID")
return err
}
s.nodeID = nodeID
}
return nil
}
func (s *Service) getInitiators() ([]string, []string, []string, error) {
ctx := context.Background()
var iscsiAvailable bool
var fcAvailable bool
var nvmeAvailable bool
iscsiInitiators, err := s.iscsiConnector.GetInitiatorName(ctx)
if err != nil {
log.Error("nodeStartup could not GetInitiatorIQNs")
} else if len(iscsiInitiators) == 0 {
log.Error("iscsi initiators not found on node")
} else {
log.Debug("iscsi initiators found on node")
iscsiAvailable = true
}
fcInitiators, err := s.getNodeFCPorts(ctx)
if err != nil {
log.Error("nodeStartup could not FC initiators for node")
} else if len(fcInitiators) == 0 {
log.Error("FC was not found or filtered with FCPortsFilterFile")
} else {
log.Debug("FC initiators found on node")
fcAvailable = true
}
nvmeInitiators, err := s.nvmeConnector.GetInitiatorName(ctx)
if err != nil {
log.Error("nodeStartup could not get Initiator NQNs")
} else if len(nvmeInitiators) == 0 {
log.Error("NVMe initiators not found on node")
} else {
log.Debug("NVMe initiators found on node")
nvmeAvailable = true
}
if !iscsiAvailable && !fcAvailable && !nvmeAvailable {
// If we haven't found any initiators we still can use NFS
log.Info("FC, iSCSI and NVMe initiators not found on node")
}
return iscsiInitiators, fcInitiators, nvmeInitiators, nil
}
func (s *Service) getNodeFCPorts(ctx context.Context) ([]string, error) {
var err error
var initiators []string
defer func() {
initiators := initiators
log.Infof("FC initiators found: %s", initiators)
}()
rawInitiatorsData, err := s.fcConnector.GetInitiatorPorts(ctx)
if err != nil {
log.Error("failed FC initiators list from node")
return nil, err
}
for _, initiator := range rawInitiatorsData {
data, err := formatWWPN(strings.TrimPrefix(initiator, "0x"))
if err != nil {
return nil, err
}
initiators = append(initiators, data)
}
if len(initiators) == 0 {
return initiators, nil
}
portsFilter, _ := s.readFCPortsFilterFile()
if len(portsFilter) == 0 {
return initiators, nil
}
var filteredInitiators []string
for _, filterValue := range portsFilter {
for _, initiator := range initiators {
if initiator != filterValue {
continue
}
log.Infof("FC initiator port %s match filter", initiator)
filteredInitiators = append(filteredInitiators, initiator)
}
}
initiators = filteredInitiators
return initiators, nil
}
func (s *Service) readFCPortsFilterFile() ([]string, error) {
if s.opts.FCPortsFilterFilePath == "" {
return nil, nil
}
data, err := s.Fs.ReadFile(s.opts.FCPortsFilterFilePath)
if err != nil {
if os.IsNotExist(err) {
return nil, nil
}
return nil, err
}
if len(data) == 0 {
return nil, nil
}
var result []string
wwpns := strings.Split(strings.TrimSpace(string(data)), ",")
for _, p := range wwpns {
if !strings.Contains(p, ":") {
log.Error("invalid FCPortsFilterFile format")
return nil, nil
}
result = append(result, p)
}
return result, nil
}
func (s *Service) setupHost(initiators []string, client gopowerstore.Client, arrayIP string) error {
log.Infof("setting up host on %s", arrayIP)
defer log.Infof("finished setting up host on %s", arrayIP)
if s.nodeID == "" {
return fmt.Errorf("nodeID not set")
}
reqInitiators := s.buildInitiatorsArray(initiators)
var host *gopowerstore.Host
updateCHAP := false
h, err := client.GetHostByName(context.Background(), s.nodeID)
if err == nil {
err := s.updateHost(context.Background(), initiators, client, h)
if err != nil {
return err
}
if s.opts.EnableCHAP && len(h.Initiators) > 0 && (h.Initiators[0].ChapSingleUsername == "" || h.Initiators[0].ChapSingleUsername == "admin") {
log.Debug("CHAP was enabled earlier, modifying credentials")
err := s.modifyHostInitiators(context.Background(), h.ID, client, nil, nil, initiators)
if err != nil {
return fmt.Errorf("can't modify initiators CHAP credentials %s", err.Error())
}
}
s.initialized = true
return nil
}
hosts, err := client.GetHosts(context.Background())
if err != nil {
log.Error(err.Error())
return err
}
for i, h := range hosts {
found := false
for _, hI := range h.Initiators {
for _, rI := range reqInitiators {
if hI.PortName == *rI.PortName && hI.PortType == *rI.PortType {
log.Info("Found existing host ", h.Name, hI.PortName, hI.PortType)
updateCHAP = s.opts.EnableCHAP && (hI.ChapSingleUsername == "" || hI.ChapSingleUsername == "admin")
found = true
break
}
}
if found {
break
}
}
if found {
host = &hosts[i]
break
}
}
if host == nil {
// register node on PowerStore
_, err := s.createHost(context.Background(), initiators, client)
if err != nil {
log.Error(err.Error())
return err
}
} else {
// node already registered
if updateCHAP { // add CHAP credentials if they aren't available
err := s.modifyHostInitiators(context.Background(), host.ID, client, nil, nil, initiators)
if err != nil {
return fmt.Errorf("can't modify initiators CHAP credentials %s", err.Error())
}
}
// Modify the host entry with the new node ID having different prefix
err := s.modifyHostName(context.Background(), client, s.nodeID, host.ID)
if err != nil {
return fmt.Errorf("cannot update the host name %s", err.Error())
}
s.reusedHost = true
}
s.initialized = true
return nil
}
func (s *Service) modifyHostName(ctx context.Context, client gopowerstore.Client, nodeID string, hostID string) error {
modifyParams := gopowerstore.HostModify{}
modifyParams.Name = &nodeID
_, err := client.ModifyHost(ctx, &modifyParams, hostID)
if err != nil {
return err
}
log.Info("Updated nodeID ", nodeID)
return nil
}
func (s *Service) buildInitiatorsArray(initiators []string) []gopowerstore.InitiatorCreateModify {
var portType gopowerstore.InitiatorProtocolTypeEnum
if s.useNVME {
portType = gopowerstore.InitiatorProtocolTypeEnumNVME
} else if s.useFC {
portType = gopowerstore.InitiatorProtocolTypeEnumFC
} else {
portType = gopowerstore.InitiatorProtocolTypeEnumISCSI
}
initiatorsReq := make([]gopowerstore.InitiatorCreateModify, len(initiators))
for i, iqn := range initiators {
iqn := iqn
if !s.useFC && s.opts.EnableCHAP {
initiatorsReq[i] = gopowerstore.InitiatorCreateModify{
ChapSinglePassword: &s.opts.CHAPPassword,
ChapSingleUsername: &s.opts.CHAPUsername,
PortName: &iqn,
PortType: &portType,
}
} else {
initiatorsReq[i] = gopowerstore.InitiatorCreateModify{
PortName: &iqn,
PortType: &portType,
}
}
}
return initiatorsReq
}
// create or update host on PowerStore array
func (s *Service) updateHost(ctx context.Context, initiators []string, client gopowerstore.Client, host gopowerstore.Host) (err error) {
initiatorsToAdd, initiatorsToDelete := checkIQNS(initiators, host)
return s.modifyHostInitiators(ctx, host.ID, client, initiatorsToAdd, initiatorsToDelete, nil)
}
// register host
func (s *Service) createHost(ctx context.Context, initiators []string, client gopowerstore.Client) (id string, err error) {
osType := gopowerstore.OSTypeEnumLinux
reqInitiators := s.buildInitiatorsArray(initiators)
description := fmt.Sprintf("k8s node: %s", s.opts.KubeNodeName)
var createParams gopowerstore.HostCreate
defaultHeaders := client.GetCustomHTTPHeaders()
if defaultHeaders == nil {
defaultHeaders = make(http.Header)
}
customHeaders := defaultHeaders
k8sMetadataSupported := common.IsK8sMetadataSupported(client)
if k8sMetadataSupported {
customHeaders.Add("DELL-VISIBILITY", "internal")
client.SetCustomHTTPHeaders(customHeaders)
if s.opts.KubeNodeName == "" {
log.Warnf("KubeNodeName value is not set")
createParams = gopowerstore.HostCreate{
Name: &s.nodeID, OsType: &osType, Initiators: &reqInitiators,
Description: &description,
}
} else {
metadata := map[string]string{
"k8s_node_name": s.opts.KubeNodeName,
}
createParams = gopowerstore.HostCreate{
Name: &s.nodeID, OsType: &osType, Initiators: &reqInitiators,
Description: &description, Metadata: &metadata,
}
}
} else {
createParams = gopowerstore.HostCreate{
Name: &s.nodeID, OsType: &osType, Initiators: &reqInitiators,
Description: &description,
}
}
resp, err := client.CreateHost(ctx, &createParams)
// reset custom header
customHeaders.Del("DELL-VISIBILITY")
client.SetCustomHTTPHeaders(customHeaders)
if err != nil {
return id, err
}
return resp.ID, err
}
// add or remove initiators from host
func (s *Service) modifyHostInitiators(ctx context.Context, hostID string, client gopowerstore.Client,
initiatorsToAdd []string, initiatorsToDelete []string, initiatorsToModify []string,
) error {
if len(initiatorsToDelete) > 0 {
modifyParams := gopowerstore.HostModify{}
modifyParams.RemoveInitiators = &initiatorsToDelete
_, err := client.ModifyHost(ctx, &modifyParams, hostID)
if err != nil {
return err
}
}
if len(initiatorsToAdd) > 0 {
modifyParams := gopowerstore.HostModify{}
initiators := s.buildInitiatorsArray(initiatorsToAdd)
modifyParams.AddInitiators = &initiators
_, err := client.ModifyHost(ctx, &modifyParams, hostID)
if err != nil {
return err
}
}
if len(initiatorsToModify) > 0 {
modifyParams := gopowerstore.HostModify{}
initiators := s.buildInitiatorsArrayModify(initiatorsToModify)
modifyParams.ModifyInitiators = &initiators
_, err := client.ModifyHost(ctx, &modifyParams, hostID)
if err != nil {
return err
}
}
return nil
}
func checkIQNS(IQNs []string, host gopowerstore.Host) (iqnToAdd, iqnToDelete []string) {
// create map with initiators which are already exist
initiatorMap := make(map[string]bool)
for _, initiator := range host.Initiators {
initiatorMap[initiator.PortName] = false
}
for _, iqn := range IQNs {
_, ok := initiatorMap[iqn]
if ok {
// the iqn should be left in the host
initiatorMap[iqn] = true
} else {
// the iqn should be added to the host
iqnToAdd = append(iqnToAdd, iqn)
}
}
// find iqns to delete from host
for iqn, found := range initiatorMap {
if !found {
iqnToDelete = append(iqnToDelete, iqn)
}
}
return
}
func (s *Service) buildInitiatorsArrayModify(initiators []string) []gopowerstore.UpdateInitiatorInHost {
initiatorsReq := make([]gopowerstore.UpdateInitiatorInHost, len(initiators))
for i, iqn := range initiators {
iqn := iqn
if !s.useFC && s.opts.EnableCHAP {
initiatorsReq[i] = gopowerstore.UpdateInitiatorInHost{
ChapSinglePassword: &s.opts.CHAPPassword,
ChapSingleUsername: &s.opts.CHAPUsername,
PortName: &iqn,
}
} else {
initiatorsReq[i] = gopowerstore.UpdateInitiatorInHost{
PortName: &iqn,
}
}
}
return initiatorsReq
}
func (s *Service) fileExists(filename string) bool {
_, err := s.Fs.Stat(filename)
if err == nil {
return true
}
if os.IsNotExist(err) {
log.WithFields(log.Fields{
"filename": filename,
"error": err,
}).Error("File doesn't exist")
} else {
log.WithFields(log.Fields{
"filename": filename,
"error": err,
}).Error("Error while checking stat of file")
}
return false
}
/*
*
* Copyright © 2022-2023 Dell Inc. or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package node
import (
"context"
"encoding/json"
"fmt"
"net/http"
"strings"
"sync"
"time"
"github.com/dell/csi-powerstore/v2/pkg/array"
"github.com/dell/csi-powerstore/v2/pkg/common"
"github.com/dell/goiscsi"
"github.com/dell/gonvme"
"github.com/dell/gopowerstore"
"github.com/gorilla/mux"
log "github.com/sirupsen/logrus"
)
// pollingFrequency in seconds
var pollingFrequencyInSeconds int64
// probeStatus map[string]ArrayConnectivityStatus
var probeStatus *sync.Map
// startAPIService reads nodes to array status periodically
func (s *Service) startAPIService(ctx context.Context) {
if !s.isPodmonEnabled {
log.Info("podmon is not enabled")
return
}
pollingFrequencyInSeconds = common.SetPollingFrequency(ctx)
s.startNodeToArrayConnectivityCheck(ctx)
s.apiRouter(ctx)
}
// apiRouter serves http requests
func (s *Service) apiRouter(_ context.Context) {
log.Infof("starting http server on port %s", common.APIPort)
// create a new mux router
router := mux.NewRouter()
// route to connectivity status
// connectivityStatus is the handlers
router.HandleFunc(common.ArrayStatus, connectivityStatus).Methods("GET")
router.HandleFunc(common.ArrayStatus+"/"+"{arrayId}", getArrayConnectivityStatus).Methods("GET")
// start http server to serve requests
server := &http.Server{
Addr: common.APIPort,
Handler: router,
ReadTimeout: common.Timeout,
WriteTimeout: common.Timeout,
}
err := server.ListenAndServe()
if err != nil {
log.Errorf("unable to start http server to serve status requests due to %s", err)
}
}
// connectivityStatus handler returns array connectivity status
func connectivityStatus(w http.ResponseWriter, _ *http.Request) {
log.Infof("connectivityStatus called, status is %v \n", probeStatus)
// w.Header().Set("Content-Type", "application/json")
if probeStatus == nil {
log.Errorf("error probeStatus map in cache is empty")
w.WriteHeader(http.StatusInternalServerError)
w.Header().Set("Content-Type", "application/json")
return
}
// convert struct to JSON
log.Debugf("ProbeStatus fetched from the cache has %+v", probeStatus)
jsonResponse, err := MarshalSyncMapToJSON(probeStatus)
if err != nil {
log.Errorf("error %s during marshaling to json", err)
w.WriteHeader(http.StatusInternalServerError)
w.Header().Set("Content-Type", "application/json")
return
}
log.Info("sending connectivityStatus for all arrays ")
w.Header().Set("Content-Type", "application/json")
_, err = w.Write(jsonResponse)
if err != nil {
log.Errorf("unable to write response %s", err)
}
}
// MarshalSyncMapToJSON marshal the sync Map to Json
func MarshalSyncMapToJSON(m *sync.Map) ([]byte, error) {
tmpMap := make(map[string]common.ArrayConnectivityStatus)
m.Range(func(k, value interface{}) bool {
// this check is not necessary but just in case is someone in future play around this
switch value.(type) {
case common.ArrayConnectivityStatus:
tmpMap[k.(string)] = value.(common.ArrayConnectivityStatus)
return true
default:
log.Errorf("invalid data is stored in cache")
return false
}
})
log.Debugf("map value is %+v", tmpMap)
if len(tmpMap) == 0 {
return nil, fmt.Errorf("invalid data is stored in cache")
}
return json.Marshal(tmpMap)
}
// getArrayConnectivityStatus handler lists status of the requested array
func getArrayConnectivityStatus(w http.ResponseWriter, r *http.Request) {
arrayID := mux.Vars(r)["arrayId"]
log.Infof("GetArrayConnectivityStatus called for array %s \n", arrayID)
status, found := probeStatus.Load(arrayID)
if !found {
// specify status code
w.WriteHeader(http.StatusNotFound)
w.Header().Set("Content-Type", "application/json")
// update response writer
fmt.Fprintf(w, "array %s not found \n", arrayID)
return
}
// convert status struct to JSON
jsonResponse, err := json.Marshal(status)
if err != nil {
log.Errorf("error %s during marshaling to json", err)
w.WriteHeader(http.StatusInternalServerError)
w.Header().Set("Content-Type", "application/json")
return
}
log.Infof("sending response %+v for array %s \n", status, arrayID)
// update response
_, err = w.Write(jsonResponse)
if err != nil {
log.Errorf("unable to write response %s", err)
}
}
// startNodeToArrayConnectivityCheck starts connectivityTest as one goroutine for each array
func (s *Service) startNodeToArrayConnectivityCheck(ctx context.Context) {
log.Debug("startNodeToArrayConnectivityCheck called")
probeStatus = new(sync.Map)
// in case if we want to store the status of default array, uncomment below line
// powerStoreArray := s.DefaultArray()
powerStoreArray := s.Arrays()
for _, array := range powerStoreArray {
// start one goroutine for each array, so each array's nodeProbe run concurrently
// should we really store the status of all array instead of default one, currently podman query only default array?
go s.testConnectivityAndUpdateStatus(ctx, array, common.Timeout)
}
log.Infof("startNodeToArrayConnectivityCheck is running probes at pollingFrequency %d ", pollingFrequencyInSeconds/2)
}
// testConnectivityAndUpdateStatus runs probe to test connectivity from node to array
// updates probeStatus map[array]ArrayConnectivityStatus
func (s *Service) testConnectivityAndUpdateStatus(ctx context.Context, array *array.PowerStoreArray, timeout time.Duration) {
defer func() {
if err := recover(); err != nil {
log.Errorf("panic occurred in testConnectivityAndUpdateStatus: %s for array having %s", err, array.GlobalID)
}
// if panic occurs restart new goroutine
go s.testConnectivityAndUpdateStatus(ctx, array, timeout)
}()
var status common.ArrayConnectivityStatus
for {
// add timeout to context
timeOutCtx, cancel := context.WithTimeout(ctx, timeout)
log.Debugf("Running probe for array %s at time %v \n", array.GlobalID, time.Now())
if existingStatus, ok := probeStatus.Load(array.GlobalID); !ok {
log.Debugf("%s not in probeStatus ", array.GlobalID)
} else {
if status, ok = existingStatus.(common.ArrayConnectivityStatus); !ok {
log.Errorf("failed to extract ArrayConnectivityStatus for array '%s'", array.GlobalID)
}
}
// for the first time status will not be there.
log.Debugf("array %s , status is %+v", array.GlobalID, status)
// run nodeProbe to test connectivity
err := s.nodeProbe(timeOutCtx, array)
if err == nil {
log.Debugf("Probe successful for %s", array.GlobalID)
status.LastSuccess = time.Now().Unix()
} else {
log.Debugf("Probe failed for array '%s' error:'%s'", array.GlobalID, err)
}
status.LastAttempt = time.Now().Unix()
log.Debugf("array %s , storing status %+v", array.GlobalID, status)
probeStatus.Store(array.GlobalID, status)
cancel()
// sleep for half the pollingFrequency and run check again
time.Sleep(time.Second * time.Duration(pollingFrequencyInSeconds/2))
}
}
// nodeProbe function used to store the status of array
func (s *Service) nodeProbe(_ context.Context, array *array.PowerStoreArray) error {
// try to get the host
host, err := array.Client.GetHostByName(context.Background(), s.nodeID)
// possibly NFS could be there.
if err != nil {
if apiError, ok := err.(gopowerstore.APIError); ok && apiError.NotFound() && s.useNFS {
log.Debugf("Error %s, while probing %s but since it's NFS this is expected", err.Error(), array.GlobalID)
return nil
}
// nodeId is not right or it's not NFS and still host is not preset
log.Infof("Error %s, while probing %s", err.Error(), array.GlobalID)
return err
}
log.Debugf("Successfully got Host on %s", array.GlobalID)
s.populateTargetsInCache(array)
// check if nvme sessions are active
if s.useNVME {
log.Debugf("Checking if nvme sessions are active on node or not")
sessions, _ := s.nvmeLib.GetSessions()
for _, target := range s.nvmeTargets[array.GlobalID] {
for _, session := range sessions {
log.Debugf("matching %v with %v", target, session)
if session.Target == target && session.NVMESessionState == gonvme.NVMESessionStateLive {
if s.useNFS {
s.useNFS = false
}
return nil
}
}
}
if s.useNFS {
log.Infof("Host Entry found but failed to login to nvme target, seems to be this worker has only NFS")
return nil
}
return fmt.Errorf("no active nvme sessions")
} else if s.useFC {
log.Debugf("Checking if FC sessions are active on node or not")
for _, initiator := range host.Initiators {
if len(initiator.ActiveSessions) > 0 {
return nil
}
}
return fmt.Errorf("no active fc sessions")
}
// check if iscsi sessions are active
// if !s.useNVME && !s.useFC {
log.Debugf("Checking if iscsi sessions are active on node or not")
sessions, _ := s.iscsiLib.GetSessions()
for _, target := range s.iscsiTargets[array.GlobalID] {
for _, session := range sessions {
log.Debugf("matching %v with %v", target, session)
if session.Target == target && session.ISCSISessionState == goiscsi.ISCSISessionStateLOGGEDIN {
if s.useNFS {
s.useNFS = false
}
return nil
}
}
}
if s.useNFS {
log.Infof("Host Entry found but failed to login to iscsi target, seems to be this worker has only NFS")
return nil
}
return fmt.Errorf("no active iscsi sessions")
}
// populateTargetsInCache checks if nvmeTargets or iscsiTargets in cache is empty, try to fetch the targets from array and populate the cache
func (s *Service) populateTargetsInCache(array *array.PowerStoreArray) {
// if nvmeTargets in cache is empty
// this could be empty in 2 cases: Either container is getting restarted or discovery & login has failed in NodeGetInfo
if s.useNVME {
if len(s.nvmeTargets[array.GlobalID]) != 0 {
return
}
// for NVMeFC
if s.useFC {
nvmefcInfo, err := common.GetNVMEFCTargetInfoFromStorage(array.GetClient(), "")
if err != nil {
log.Errorf("couldn't get targets from the array: %s", err.Error())
return
}
for _, info := range nvmefcInfo {
NVMeFCTargets, err := s.nvmeLib.DiscoverNVMeFCTargets(info.Portal, false)
if err != nil {
log.Errorf("couldn't discover NVMeFC targets")
continue
}
for _, target := range NVMeFCTargets {
otherTargets := s.nvmeTargets[array.GlobalID]
s.nvmeTargets[array.GlobalID] = append(otherTargets, target.TargetNqn)
}
break
}
} else {
// for NVMeTCP
infoList, err := common.GetNVMETCPTargetsInfoFromStorage(array.GetClient(), "")
if err != nil {
log.Errorf("couldn't get targets from array: %s", err.Error())
return
}
for _, address := range infoList {
nvmeIP := strings.Split(address.Portal, ":")
log.Info("Trying to discover NVMe target from portal ", nvmeIP[0])
nvmeTargets, err := s.nvmeLib.DiscoverNVMeTCPTargets(nvmeIP[0], false)
if err != nil {
log.Error("couldn't discover targets")
continue
}
for _, target := range nvmeTargets {
otherTargets := s.nvmeTargets[array.GlobalID]
s.nvmeTargets[array.GlobalID] = append(otherTargets, target.TargetNqn)
}
break
}
}
} else if !s.useFC && !s.useNFS {
// if iscsiTargets in cache is empty
if len(s.iscsiTargets[array.GlobalID]) != 0 {
return
}
infoList, err := common.GetISCSITargetsInfoFromStorage(array.GetClient(), "")
if err != nil {
log.Errorf("couldn't get targets from array: %s", err.Error())
return
}
var iscsiTargets []goiscsi.ISCSITarget
for _, address := range infoList {
// first check if this portal is reachable from this machine or not
if ReachableEndPoint(address.Portal) {
// doesn't matter how many portals are present, discovering from any one will list out all targets
log.Info("Trying to discover iSCSI target from portal ", address.Portal)
iscsiTargets, err = s.iscsiLib.DiscoverTargets(address.Portal, false)
if err != nil {
log.Error("couldn't discover targets")
continue
}
for _, target := range iscsiTargets {
otherTargets := s.iscsiTargets[array.GlobalID]
s.iscsiTargets[array.GlobalID] = append(otherTargets, target.Target)
}
break
}
log.Debugf("Portal %s is not rechable from the node", address.Portal)
}
}
}
/*
*
* Copyright © 2021-2023 Dell Inc. or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package node
import (
"context"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/dell/csi-powerstore/v2/pkg/common/fs"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// VolumePublisher allows to node publish a volume
type VolumePublisher interface {
Publish(ctx context.Context, logFields log.Fields, fs fs.Interface,
cap *csi.VolumeCapability, isRO bool, targetPath string, stagingPath string) (*csi.NodePublishVolumeResponse, error)
}
// SCSIPublisher implementation of NodeVolumePublisher for SCSI based (FC, iSCSI) volumes
type SCSIPublisher struct {
isBlock bool
}
// Publish publishes volume as either raw block or mount by mounting it to the target path
func (sp *SCSIPublisher) Publish(ctx context.Context, logFields log.Fields, fs fs.Interface, cap *csi.VolumeCapability, isRO bool, targetPath string, stagingPath string) (*csi.NodePublishVolumeResponse, error) {
published, err := isAlreadyPublished(ctx, targetPath, getRWModeString(isRO), fs)
if err != nil {
return nil, err
}
if published {
return &csi.NodePublishVolumeResponse{}, nil
}
if sp.isBlock {
return sp.publishBlock(ctx, logFields, fs, cap, isRO, targetPath, stagingPath)
}
return sp.publishMount(ctx, logFields, fs, cap, isRO, targetPath, stagingPath)
}
func (sp *SCSIPublisher) publishBlock(ctx context.Context, logFields log.Fields, fs fs.Interface, _ *csi.VolumeCapability, isRO bool, targetPath string, stagingPath string) (*csi.NodePublishVolumeResponse, error) {
log.WithFields(logFields).Info("start publishing as block device")
if isRO {
return nil, status.Error(codes.InvalidArgument, "read only not supported for Block Volume")
}
if _, err := fs.MkFileIdempotent(targetPath); err != nil {
return nil, status.Errorf(codes.Internal,
"can't create target file %s: %s", targetPath, err.Error())
}
log.WithFields(logFields).Info("target path successfully created")
if err := fs.GetUtil().BindMount(ctx, stagingPath, targetPath); err != nil {
return nil, status.Errorf(codes.Internal,
"error bind disk %s to target path: %s", stagingPath, err.Error())
}
log.WithFields(logFields).Info("volume successfully binded")
return &csi.NodePublishVolumeResponse{}, nil
}
func (sp *SCSIPublisher) publishMount(ctx context.Context, logFields log.Fields, fs fs.Interface, cap *csi.VolumeCapability, isRO bool, targetPath string, stagingPath string) (*csi.NodePublishVolumeResponse, error) {
if cap.GetAccessMode().GetMode() == csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER {
// MULTI_WRITER not supported for mount volumes
return nil, status.Error(codes.Unimplemented, "Mount volumes do not support AccessMode MULTI_NODE_MULTI_WRITER")
}
if cap.GetAccessMode().GetMode() == csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY {
// Warning in case of MULTI_NODE_READER_ONLY for mount volumes
log.Warningf("Mount volume with the AccessMode ReadOnlyMany")
}
var opts []string
mountCap := cap.GetMount()
mountFsType := mountCap.GetFsType()
mntFlags := mountCap.GetMountFlags()
if mountFsType == "xfs" {
mntFlags = append(mntFlags, "nouuid")
}
targetFS := mountCap.GetFsType()
if targetFS == "xfs" {
opts = []string{"-m", "crc=0,finobt=0"}
}
if err := fs.MkdirAll(targetPath, 0o750); err != nil {
return nil, status.Errorf(codes.Internal,
"can't create target dir with Mkdirall %s: %s", targetPath, err.Error())
}
log.WithFields(logFields).Info("target dir successfully created")
curFS, err := fs.GetUtil().GetDiskFormat(ctx, stagingPath)
if err != nil {
return nil, status.Errorf(codes.Internal,
"error while trying to detect fs for staging path %s: %s", stagingPath, err.Error())
}
if curFS != "" && targetFS != "" && curFS != targetFS {
return nil, status.Errorf(codes.FailedPrecondition,
"filesystem mismatch. Target device already formatted to %s mount spec require %s",
curFS, targetFS)
}
if curFS == "" {
log.WithFields(logFields).Infof("no filesystem found on staged disk %s", stagingPath)
if isRO {
return nil, status.Errorf(codes.FailedPrecondition,
"RO mount required but no fs detected on staged volume %s", stagingPath)
}
if err := format(ctx, stagingPath, targetFS, fs, opts...); err != nil {
return nil, status.Errorf(codes.Internal,
"can't format staged device %s: %s", stagingPath, err.Error())
}
log.WithFields(logFields).Infof("staged disk %s successfully formatted to %s", stagingPath, targetFS)
}
if isRO {
mntFlags = append(mntFlags, "ro")
}
if err := fs.GetUtil().Mount(ctx, stagingPath, targetPath, targetFS, mntFlags...); err != nil {
return nil, status.Errorf(codes.Internal,
"error performing mount for staging path %s: %s", stagingPath, err.Error())
}
log.WithFields(logFields).Info("volume successfully mounted")
return &csi.NodePublishVolumeResponse{}, nil
}
// NFSPublisher implementation of NodeVolumePublisher for NFS volumes
type NFSPublisher struct{}
// Publish publishes nfs volume by mounting it to the target path
func (np *NFSPublisher) Publish(ctx context.Context, logFields log.Fields, fs fs.Interface,
cap *csi.VolumeCapability, isRO bool, targetPath string, stagingPath string,
) (*csi.NodePublishVolumeResponse, error) {
published, err := isAlreadyPublished(ctx, targetPath, getRWModeString(isRO), fs)
if err != nil {
return nil, err
}
if published {
return &csi.NodePublishVolumeResponse{}, nil
}
if err := fs.MkdirAll(targetPath, 0o750); err != nil {
return nil, status.Errorf(codes.Internal,
"can't create target folder %s: %s", stagingPath, err.Error())
}
log.WithFields(logFields).Info("target path successfully created")
mountCap := cap.GetMount()
mntFlags := mountCap.GetMountFlags()
if isRO {
mntFlags = append(mntFlags, "ro")
}
if err := fs.GetUtil().BindMount(ctx, stagingPath, targetPath, mntFlags...); err != nil {
return nil, status.Errorf(codes.Internal,
"error bind disk %s to target path: %s", stagingPath, err.Error())
}
log.WithFields(logFields).Info("volume successfully binded")
return &csi.NodePublishVolumeResponse{}, nil
}
/*
*
* Copyright © 2021-2023 Dell Inc. or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package node
import (
"context"
"fmt"
"os"
"path"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/dell/csi-powerstore/v2/pkg/array"
"github.com/dell/csi-powerstore/v2/pkg/common"
"github.com/dell/csi-powerstore/v2/pkg/common/fs"
"github.com/dell/gobrick"
"github.com/dell/gopowerstore"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
const (
procMountsPath = "/proc/self/mountinfo"
procMountsRetries = 15
)
// VolumeStager allows to node stage a volume
type VolumeStager interface {
Stage(ctx context.Context, req *csi.NodeStageVolumeRequest, logFields log.Fields, fs fs.Interface, id string) (*csi.NodeStageVolumeResponse, error)
}
// ReachableEndPoint checks if the endpoint is reachable or not
var ReachableEndPoint = common.ReachableEndPoint
// SCSIStager implementation of NodeVolumeStager for SCSI based (FC, iSCSI) volumes
type SCSIStager struct {
useFC bool
useNVME bool
iscsiConnector ISCSIConnector
nvmeConnector NVMEConnector
fcConnector FcConnector
}
// Stage stages volume by connecting it through either FC or iSCSI and creating bind mount to staging path
func (s *SCSIStager) Stage(ctx context.Context, req *csi.NodeStageVolumeRequest,
logFields log.Fields, fs fs.Interface, id string,
) (*csi.NodeStageVolumeResponse, error) {
// append additional path to be able to do bind mounts
stagingPath := getStagingPath(ctx, req.GetStagingTargetPath(), id)
publishContext, err := readSCSIInfoFromPublishContext(req.PublishContext, s.useFC, s.useNVME)
if err != nil {
return nil, err
}
logFields["ID"] = id
if s.useNVME {
if s.useFC {
logFields["Targets"] = publishContext.nvmefcTargets
} else {
logFields["Targets"] = publishContext.nvmetcpTargets
}
} else {
logFields["Targets"] = publishContext.iscsiTargets
}
logFields["WWN"] = publishContext.deviceWWN
logFields["Lun"] = publishContext.volumeLUNAddress
logFields["StagingPath"] = stagingPath
ctx = common.SetLogFields(ctx, logFields)
found, ready, err := isReadyToPublish(ctx, stagingPath, fs)
if err != nil {
return nil, err
}
if ready {
log.WithFields(logFields).Info("device already staged")
return &csi.NodeStageVolumeResponse{}, nil
} else if found {
log.WithFields(logFields).Warning("volume found in staging path but it is not ready for publish," +
"try to unmount it and retry staging again")
_, err := unstageVolume(ctx, stagingPath, id, logFields, err, fs)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to unmount volume: %s", err.Error())
}
}
devicePath, err := s.connectDevice(ctx, publishContext)
if err != nil {
return nil, err
}
logFields["DevicePath"] = devicePath
log.WithFields(logFields).Info("start staging")
if _, err := fs.MkFileIdempotent(stagingPath); err != nil {
return nil, status.Errorf(codes.Internal, "can't create target file %s: %s",
stagingPath, err.Error())
}
log.WithFields(logFields).Info("target path successfully created")
if err := fs.GetUtil().BindMount(ctx, devicePath, stagingPath); err != nil {
return nil, status.Errorf(codes.Internal,
"error bind disk %s to target path: %s", devicePath, err.Error())
}
log.WithFields(logFields).Info("stage complete")
return &csi.NodeStageVolumeResponse{}, nil
}
// NFSStager implementation of NodeVolumeStager for NFS volumes
type NFSStager struct {
array *array.PowerStoreArray
}
// Stage stages volume by mounting volumes as nfs to the staging path
func (n *NFSStager) Stage(ctx context.Context, req *csi.NodeStageVolumeRequest,
logFields log.Fields, fs fs.Interface, id string,
) (*csi.NodeStageVolumeResponse, error) {
// append additional path to be able to do bind mounts
stagingPath := getStagingPath(ctx, req.GetStagingTargetPath(), id)
hostIP := req.PublishContext[common.KeyHostIP]
exportID := req.PublishContext[common.KeyExportID]
nfsExport := req.PublishContext[common.KeyNfsExportPath]
allowRoot := req.PublishContext[common.KeyAllowRoot]
nasName := req.PublishContext[common.KeyNasName]
natIP := ""
if ip, ok := req.PublishContext[common.KeyNatIP]; ok {
natIP = ip
}
logFields["NfsExportPath"] = nfsExport
logFields["StagingPath"] = req.GetStagingTargetPath()
logFields["ID"] = id
logFields["AllowRoot"] = allowRoot
logFields["ExportID"] = exportID
logFields["HostIP"] = hostIP
logFields["NatIP"] = natIP
logFields["NFSv4ACLs"] = req.PublishContext[common.KeyNfsACL]
logFields["NasName"] = nasName
ctx = common.SetLogFields(ctx, logFields)
found, err := isReadyToPublishNFS(ctx, stagingPath, fs)
if err != nil {
return nil, err
}
if found {
log.WithFields(logFields).Info("device already staged")
return &csi.NodeStageVolumeResponse{}, nil
}
if err := fs.MkdirAll(stagingPath, 0o750); err != nil {
return nil, status.Errorf(codes.Internal,
"can't create target folder %s: %s", stagingPath, err.Error())
}
log.WithFields(logFields).Info("stage path successfully created")
if err := fs.GetUtil().Mount(ctx, nfsExport, stagingPath, ""); err != nil {
return nil, status.Errorf(codes.Internal,
"error mount nfs share %s to target path: %s", nfsExport, err.Error())
}
// Create folder with 1777 in nfs share so every user can use it
if err := fs.MkdirAll(filepath.Join(stagingPath, commonNfsVolumeFolder), 0o750); err != nil {
return nil, status.Errorf(codes.Internal,
"can't create common folder %s: %s", filepath.Join(stagingPath, "volume"), err.Error())
}
mode := os.ModePerm
acls := req.PublishContext[common.KeyNfsACL]
aclsConfigured := false
if acls != "" {
if posixMode(acls) {
perm, err := strconv.ParseUint(acls, 8, 32)
if err == nil {
mode = os.FileMode(perm) // #nosec: G115 false positive
} else {
log.WithFields(logFields).Warn("can't parse file mode, invalid mode specified. Default mode permissions will be set.")
}
} else {
aclsConfigured, err = validateAndSetACLs(ctx, &NFSv4ACLs{}, nasName, n.array.GetClient(), acls, filepath.Join(stagingPath, commonNfsVolumeFolder))
if err != nil || !aclsConfigured {
return nil, err
}
}
}
if !aclsConfigured {
if err := fs.Chmod(filepath.Join(stagingPath, commonNfsVolumeFolder), os.ModeSticky|mode); err != nil {
return nil, status.Errorf(codes.Internal,
"can't change permissions of folder %s: %s", filepath.Join(stagingPath, "volume"), err.Error())
}
}
if allowRoot == "false" {
log.WithFields(logFields).Info("removing allow root from nfs export")
var hostsToRemove []string
var hostsToAdd []string
hostsToRemove = append(hostsToRemove, hostIP+"/255.255.255.255")
hostsToAdd = append(hostsToAdd, hostIP)
if natIP != "" {
hostsToRemove = append(hostsToRemove, natIP)
hostsToAdd = append(hostsToAdd, natIP)
}
// Modify NFS export to RW with `root_squashing`
_, err = n.array.GetClient().ModifyNFSExport(ctx, &gopowerstore.NFSExportModify{
RemoveRWRootHosts: hostsToRemove,
AddRWHosts: hostsToAdd,
}, exportID)
if err != nil {
if apiError, ok := err.(gopowerstore.APIError); !(ok && apiError.NotFound()) {
return nil, status.Errorf(codes.Internal, "failure when modifying nfs export: %s", err.Error())
}
}
}
log.WithFields(logFields).Info("nfs share successfully mounted")
return &csi.NodeStageVolumeResponse{}, nil
}
type scsiPublishContextData struct {
deviceWWN string
volumeLUNAddress string
iscsiTargets []gobrick.ISCSITargetInfo
nvmetcpTargets []gobrick.NVMeTargetInfo
nvmefcTargets []gobrick.NVMeTargetInfo
fcTargets []gobrick.FCTargetInfo
}
func readSCSIInfoFromPublishContext(publishContext map[string]string, useFC bool, useNVMe bool) (scsiPublishContextData, error) {
// Get publishContext
var data scsiPublishContextData
deviceWWN, ok := publishContext[common.PublishContextDeviceWWN]
if !ok {
return data, status.Error(codes.InvalidArgument, "deviceWWN must be in publish context")
}
volumeLUNAddress, ok := publishContext[common.PublishContextLUNAddress]
if !ok {
return data, status.Error(codes.InvalidArgument, "volumeLUNAddress must be in publish context")
}
iscsiTargets := readISCSITargetsFromPublishContext(publishContext)
if len(iscsiTargets) == 0 && !useFC && !useNVMe {
return data, status.Error(codes.InvalidArgument, "iscsiTargets data must be in publish context")
}
nvmeTCPTargets := readNVMETCPTargetsFromPublishContext(publishContext)
if len(nvmeTCPTargets) == 0 && useNVMe && !useFC {
return data, status.Error(codes.InvalidArgument, "NVMeTCP Targets data must be in publish context")
}
nvmeFCTargets := readNVMEFCTargetsFromPublishContext(publishContext)
if len(nvmeFCTargets) == 0 && useNVMe && useFC {
return data, status.Error(codes.InvalidArgument, "NVMeFC Targets data must be in publish context")
}
fcTargets := readFCTargetsFromPublishContext(publishContext)
if len(fcTargets) == 0 && useFC && !useNVMe {
return data, status.Error(codes.InvalidArgument, "fcTargets data must be in publish context")
}
return scsiPublishContextData{
deviceWWN: deviceWWN, volumeLUNAddress: volumeLUNAddress,
iscsiTargets: iscsiTargets, nvmetcpTargets: nvmeTCPTargets, nvmefcTargets: nvmeFCTargets, fcTargets: fcTargets,
}, nil
}
func readISCSITargetsFromPublishContext(pc map[string]string) []gobrick.ISCSITargetInfo {
var targets []gobrick.ISCSITargetInfo
for i := 0; ; i++ {
target := gobrick.ISCSITargetInfo{}
t, tfound := pc[fmt.Sprintf("%s%d", common.PublishContextISCSITargetsPrefix, i)]
if tfound {
target.Target = t
}
p, pfound := pc[fmt.Sprintf("%s%d", common.PublishContextISCSIPortalsPrefix, i)]
if pfound {
target.Portal = p
}
if !tfound || !pfound {
break
}
if ReachableEndPoint(p) {
// if the portals from the context (set in ControllerPublishVolume) is not reachable from the nodes
targets = append(targets, target)
}
}
log.Infof("iSCSI iscsiTargets from context: %v", targets)
return targets
}
func readNVMETCPTargetsFromPublishContext(pc map[string]string) []gobrick.NVMeTargetInfo {
var targets []gobrick.NVMeTargetInfo
for i := 0; ; i++ {
target := gobrick.NVMeTargetInfo{}
t, tfound := pc[fmt.Sprintf("%s%d", common.PublishContextNVMETCPTargetsPrefix, i)]
if tfound {
target.Target = t
}
p, pfound := pc[fmt.Sprintf("%s%d", common.PublishContextNVMETCPPortalsPrefix, i)]
if pfound {
target.Portal = p
}
if !tfound || !pfound {
break
}
targets = append(targets, target)
}
log.Infof("NVMeTCP Targets from context: %v", targets)
return targets
}
func readNVMEFCTargetsFromPublishContext(pc map[string]string) []gobrick.NVMeTargetInfo {
var targets []gobrick.NVMeTargetInfo
for i := 0; ; i++ {
target := gobrick.NVMeTargetInfo{}
t, tfound := pc[fmt.Sprintf("%s%d", common.PublishContextNVMEFCTargetsPrefix, i)]
if tfound {
target.Target = t
}
p, pfound := pc[fmt.Sprintf("%s%d", common.PublishContextNVMEFCPortalsPrefix, i)]
if pfound {
target.Portal = p
}
if !tfound || !pfound {
break
}
targets = append(targets, target)
}
log.Infof("NVMeFC Targets from context: %v", targets)
return targets
}
func readFCTargetsFromPublishContext(pc map[string]string) []gobrick.FCTargetInfo {
var targets []gobrick.FCTargetInfo
for i := 0; ; i++ {
wwpn, tfound := pc[fmt.Sprintf("%s%d", common.PublishContextFCWWPNPrefix, i)]
if !tfound {
break
}
targets = append(targets, gobrick.FCTargetInfo{WWPN: wwpn})
}
log.Infof("FC iscsiTargets from context: %v", targets)
return targets
}
func (s *SCSIStager) connectDevice(ctx context.Context, data scsiPublishContextData) (string, error) {
logFields := common.GetLogFields(ctx)
var err error
lun, err := strconv.Atoi(data.volumeLUNAddress)
if err != nil {
log.WithFields(logFields).Errorf("failed to convert lun number to int: %s", err.Error())
return "", status.Errorf(codes.Internal,
"failed to convert lun number to int: %s", err.Error())
}
wwn := data.deviceWWN
var device gobrick.Device
if s.useNVME {
device, err = s.connectNVMEDevice(ctx, wwn, data, s.useFC)
} else if s.useFC {
device, err = s.connectFCDevice(ctx, lun, data)
} else {
device, err = s.connectISCSIDevice(ctx, lun, data)
}
if err != nil {
log.WithFields(logFields).Errorf("Unable to find device after multiple discovery attempts: %s", err.Error())
return "", status.Errorf(codes.Internal,
"unable to find device after multiple discovery attempts: %s", err.Error())
}
devicePath := path.Join("/dev/", device.Name)
return devicePath, nil
}
func (s *SCSIStager) connectISCSIDevice(ctx context.Context,
lun int, data scsiPublishContextData,
) (gobrick.Device, error) {
logFields := common.GetLogFields(ctx)
var targets []gobrick.ISCSITargetInfo
for _, t := range data.iscsiTargets {
targets = append(targets, gobrick.ISCSITargetInfo{Target: t.Target, Portal: t.Portal})
}
// separate context to prevent 15 seconds cancel from kubernetes
connectorCtx, cFunc := context.WithTimeout(context.Background(), time.Second*120)
defer cFunc()
connectorCtx = common.SetLogFields(connectorCtx, logFields)
return s.iscsiConnector.ConnectVolume(connectorCtx, gobrick.ISCSIVolumeInfo{
Targets: targets,
Lun: lun,
})
}
func (s *SCSIStager) connectNVMEDevice(ctx context.Context,
wwn string, data scsiPublishContextData, useFC bool,
) (gobrick.Device, error) {
logFields := common.GetLogFields(ctx)
var targets []gobrick.NVMeTargetInfo
if useFC {
for _, t := range data.nvmefcTargets {
targets = append(targets, gobrick.NVMeTargetInfo{Target: t.Target, Portal: t.Portal})
}
} else {
for _, t := range data.nvmetcpTargets {
targets = append(targets, gobrick.NVMeTargetInfo{Target: t.Target, Portal: t.Portal})
}
}
// separate context to prevent 15 seconds cancel from kubernetes
connectorCtx, cFunc := context.WithTimeout(context.Background(), time.Second*120)
defer cFunc()
connectorCtx = common.SetLogFields(connectorCtx, logFields)
return s.nvmeConnector.ConnectVolume(connectorCtx, gobrick.NVMeVolumeInfo{
Targets: targets,
WWN: wwn,
}, useFC)
}
func (s *SCSIStager) connectFCDevice(ctx context.Context,
lun int, data scsiPublishContextData,
) (gobrick.Device, error) {
logFields := common.GetLogFields(ctx)
var targets []gobrick.FCTargetInfo
for _, t := range data.fcTargets {
targets = append(targets, gobrick.FCTargetInfo{WWPN: t.WWPN})
}
// separate context to prevent 15 seconds cancel from kubernetes
connectorCtx, cFunc := context.WithTimeout(context.Background(), time.Second*120)
defer cFunc()
connectorCtx = common.SetLogFields(connectorCtx, logFields)
return s.fcConnector.ConnectVolume(connectorCtx, gobrick.FCVolumeInfo{
Targets: targets,
Lun: lun,
})
}
func isReadyToPublish(ctx context.Context, stagingPath string, fs fs.Interface) (bool, bool, error) {
logFields := common.GetLogFields(ctx)
stageInfo, found, err := getTargetMount(ctx, stagingPath, fs)
if err != nil {
return found, false, err
}
if !found {
log.WithFields(logFields).Warning("staged device not found")
return found, false, nil
}
if strings.HasSuffix(stageInfo.Source, "deleted") {
log.WithFields(logFields).Warning("staged device linked with deleted path")
return found, false, nil
}
devFS, err := fs.GetUtil().GetDiskFormat(ctx, stagingPath)
if err != nil {
return found, false, err
}
return found, devFS != "mpath_member", nil
}
func isReadyToPublishNFS(ctx context.Context, stagingPath string, fs fs.Interface) (bool, error) {
logFields := common.GetLogFields(ctx)
stageInfo, found, err := getTargetMount(ctx, stagingPath, fs)
if err != nil {
return found, err
}
if !found {
log.WithFields(logFields).Warning("staged device not found")
return found, nil
}
if strings.HasSuffix(stageInfo.Source, "deleted") {
log.WithFields(logFields).Warning("staged device linked with deleted path")
return found, nil
}
return found, nil
}
/*
*
* Copyright © 2021 Dell Inc. or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package tracer provides OpenTracing tracer implementation
package tracer
import (
"io"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go/config"
jprom "github.com/uber/jaeger-lib/metrics/prometheus"
)
// Configurator represents tracer configurator
type Configurator interface {
FromEnv() (*config.Configuration, error)
}
// NewTracer returns a new tracer object
func NewTracer(configurator Configurator) (opentracing.Tracer, io.Closer, error) {
// load config from environment variables
cfg, err := configurator.FromEnv()
if err != nil {
return nil, nil, err
}
// create tracer from config
return cfg.NewTracer(
config.Metrics(jprom.New()),
)
}