Improve plugin to be loaded as interface

This commit is contained in:
Masaki Kimura 2019-06-24 21:52:53 +00:00
parent 2c9b076ea7
commit 3acab71987
5 changed files with 190 additions and 77 deletions

View File

@ -68,6 +68,10 @@ func main() {
} }
func handle() { func handle() {
d := nfs.NewNFSdriver(nodeID, endpoint, controllerPlugin) d, err := nfs.NewNFSdriver(nodeID, endpoint, controllerPlugin)
if err != nil {
fmt.Fprintf(os.Stderr, "%s", err.Error())
os.Exit(1)
}
d.Run() d.Run()
} }

View File

@ -1,10 +1,7 @@
package nfs package nfs
import ( import (
"plugin"
"github.com/container-storage-interface/spec/lib/go/csi" "github.com/container-storage-interface/spec/lib/go/csi"
"github.com/golang/glog"
"golang.org/x/net/context" "golang.org/x/net/context"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
@ -14,104 +11,85 @@ type ControllerServer struct {
Driver *nfsDriver Driver *nfsDriver
} }
func isSupported(pluginName string, symbolName string) bool {
symbol, err := lookupSymbol(pluginName, symbolName)
return err == nil && symbol != nil
}
func lookupSymbol(pluginName string, symbolName string) (interface{}, error) {
if pluginName != "" {
plug, err := plugin.Open(pluginName)
if err != nil {
glog.Infof("Failed to load plugin: %s error: %v", pluginName, err)
return nil, err
}
symbol, err := plug.Lookup(symbolName)
if err != nil {
glog.Infof("Failed to lookup symbol: %s error: %v", symbolName, err)
return nil, err
}
return symbol, nil
}
return nil, nil
}
func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
glog.Infof("CreateVolume called") if plug, ok := cs.Driver.csPlugin.(CreateDeleteVolumeControllerPlugin); ok {
symbol, err := lookupSymbol(cs.Driver.controllerPlugin, "CreateVolume") return plug.CreateVolume(ctx, cs, req)
if err == nil && symbol != nil {
createVolume, ok := symbol.(func(cs *ControllerServer, ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error))
if ok {
return createVolume(cs, ctx, req)
}
} }
return nil, status.Error(codes.Unimplemented, "") return nil, status.Error(codes.Unimplemented, "")
} }
func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) { func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
glog.Infof("DeleteVolume called") if plug, ok := cs.Driver.csPlugin.(CreateDeleteVolumeControllerPlugin); ok {
symbol, err := lookupSymbol(cs.Driver.controllerPlugin, "DeleteVolume") return plug.DeleteVolume(ctx, cs, req)
if err == nil && symbol != nil {
deleteVolume, ok := symbol.(func(cs *ControllerServer, ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error))
if ok {
return deleteVolume(cs, ctx, req)
}
} }
return nil, status.Error(codes.Unimplemented, "") return nil, status.Error(codes.Unimplemented, "")
} }
func (cs *ControllerServer) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) { func (cs *ControllerServer) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
if plug, ok := cs.Driver.csPlugin.(PublishUnpublishVolumeControllerPlugin); ok {
return plug.ControllerPublishVolume(ctx, cs, req)
}
return nil, status.Error(codes.Unimplemented, "") return nil, status.Error(codes.Unimplemented, "")
} }
func (cs *ControllerServer) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) { func (cs *ControllerServer) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
if plug, ok := cs.Driver.csPlugin.(PublishUnpublishVolumeControllerPlugin); ok {
return plug.ControllerUnpublishVolume(ctx, cs, req)
}
return nil, status.Error(codes.Unimplemented, "") return nil, status.Error(codes.Unimplemented, "")
} }
func (cs *ControllerServer) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) { func (cs *ControllerServer) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
symbol, err := lookupSymbol(cs.Driver.controllerPlugin, "ValidateVolumeCapabilities") if plug, ok := cs.Driver.csPlugin.(ControllerPlugin); ok {
if err == nil && symbol != nil { return plug.ValidateVolumeCapabilities(ctx, cs, req)
validateVolumeCapabilities, ok := symbol.(func(cs *ControllerServer, ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error))
if ok {
return validateVolumeCapabilities(cs, ctx, req)
}
} }
return nil, status.Error(codes.Unimplemented, "") return nil, status.Error(codes.Unimplemented, "")
} }
func (cs *ControllerServer) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) { func (cs *ControllerServer) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
if plug, ok := cs.Driver.csPlugin.(ListVolumesControllerPlugin); ok {
return plug.ListVolumes(ctx, cs, req)
}
return nil, status.Error(codes.Unimplemented, "") return nil, status.Error(codes.Unimplemented, "")
} }
func (cs *ControllerServer) GetCapacity(ctx context.Context, req *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) { func (cs *ControllerServer) GetCapacity(ctx context.Context, req *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) {
if plug, ok := cs.Driver.csPlugin.(GetCapacityControllerPlugin); ok {
return plug.GetCapacity(ctx, cs, req)
}
return nil, status.Error(codes.Unimplemented, "") return nil, status.Error(codes.Unimplemented, "")
} }
// ControllerGetCapabilities implements the default GRPC callout.
// Default supports all capabilities
func (cs *ControllerServer) ControllerGetCapabilities(ctx context.Context, req *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) { func (cs *ControllerServer) ControllerGetCapabilities(ctx context.Context, req *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) {
glog.V(5).Infof("Using default ControllerGetCapabilities")
return &csi.ControllerGetCapabilitiesResponse{ return &csi.ControllerGetCapabilitiesResponse{
Capabilities: cs.Driver.cscap, Capabilities: cs.Driver.cscap,
}, nil }, nil
} }
func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) { func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
if plug, ok := cs.Driver.csPlugin.(SnapshotControllerPlugin); ok {
return plug.CreateSnapshot(ctx, cs, req)
}
return nil, status.Error(codes.Unimplemented, "") return nil, status.Error(codes.Unimplemented, "")
} }
func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) { func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) {
if plug, ok := cs.Driver.csPlugin.(SnapshotControllerPlugin); ok {
return plug.DeleteSnapshot(ctx, cs, req)
}
return nil, status.Error(codes.Unimplemented, "") return nil, status.Error(codes.Unimplemented, "")
} }
func (cs *ControllerServer) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) { func (cs *ControllerServer) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
if plug, ok := cs.Driver.csPlugin.(ListSnapshotControllerPlugin); ok {
return plug.ListSnapshots(ctx, cs, req)
}
return nil, status.Error(codes.Unimplemented, "") return nil, status.Error(codes.Unimplemented, "")
} }
func (cs *ControllerServer) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) { func (cs *ControllerServer) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
if plug, ok := cs.Driver.csPlugin.(ExpandVolumeControllerPlugin); ok {
return plug.ControllerExpandVolume(ctx, cs, req)
}
return nil, status.Error(codes.Unimplemented, "") return nil, status.Error(codes.Unimplemented, "")
} }

View File

@ -17,6 +17,9 @@ limitations under the License.
package nfs package nfs
import ( import (
"fmt"
"plugin"
"github.com/container-storage-interface/spec/lib/go/csi" "github.com/container-storage-interface/spec/lib/go/csi"
"github.com/golang/glog" "github.com/golang/glog"
) )
@ -26,48 +29,108 @@ type nfsDriver struct {
nodeID string nodeID string
version string version string
endpoint string endpoint string
controllerPlugin string
//ids *identityServer //ids *identityServer
ns *nodeServer ns *nodeServer
cap []*csi.VolumeCapability_AccessMode cap []*csi.VolumeCapability_AccessMode
cscap []*csi.ControllerServiceCapability cscap []*csi.ControllerServiceCapability
csPlugin interface{}
} }
const ( const (
driverName = "csi-nfsplugin" driverName = "csi-nfsplugin"
pluginSymbolName = "NfsPlugin"
) )
var ( var (
version = "1.0.0-rc2" version = "1.0.0-rc2"
) )
func NewNFSdriver(nodeID, endpoint, controllerPlugin string) *nfsDriver { func loadControllerPlugin(pluginName string) (interface{}, []csi.ControllerServiceCapability_RPC_Type, error) {
csc := []csi.ControllerServiceCapability_RPC_Type{}
if pluginName == "" {
csc = append(csc, csi.ControllerServiceCapability_RPC_UNKNOWN)
return nil, csc, nil
}
plug, err := plugin.Open(pluginName)
if err != nil {
glog.Infof("Failed to load plugin: %s error: %v", pluginName, err)
return nil, csc, err
}
csPlugin, err := plug.Lookup(pluginSymbolName)
if err != nil {
glog.Infof("Failed to lookup csPlugin: %s error: %v", pluginSymbolName, err)
return nil, csc, err
}
// Check if csPlugin implements each capability and add it to implenentation
if _, ok := csPlugin.(ControllerPlugin); !ok {
glog.Infof("Plugin doesn't implement mandatory methods for controller")
return nil, csc, fmt.Errorf("Plugin doesn't implement mandatory methods for controller")
}
if _, ok := csPlugin.(CreateDeleteVolumeControllerPlugin); ok {
csc = append(csc, csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME)
}
if _, ok := csPlugin.(PublishUnpublishVolumeControllerPlugin); ok {
csc = append(csc, csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME)
}
if _, ok := csPlugin.(ListVolumesControllerPlugin); ok {
csc = append(csc, csi.ControllerServiceCapability_RPC_LIST_VOLUMES)
}
if _, ok := csPlugin.(GetCapacityControllerPlugin); ok {
csc = append(csc, csi.ControllerServiceCapability_RPC_GET_CAPACITY)
}
if _, ok := csPlugin.(SnapshotControllerPlugin); ok {
csc = append(csc, csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT)
}
if _, ok := csPlugin.(ListSnapshotControllerPlugin); ok {
csc = append(csc, csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS)
}
if _, ok := csPlugin.(ExpandVolumeControllerPlugin); ok {
csc = append(csc, csi.ControllerServiceCapability_RPC_EXPAND_VOLUME)
}
// TODO: Need to handle clone volume and publish read only capability?
if len(csc) == 0 {
csc = append(csc, csi.ControllerServiceCapability_RPC_UNKNOWN)
}
return csPlugin, csc, nil
}
func NewNFSdriver(nodeID, endpoint, controllerPlugin string) (*nfsDriver, error) {
glog.Infof("Driver: %v version: %v", driverName, version) glog.Infof("Driver: %v version: %v", driverName, version)
n := &nfsDriver{ n := &nfsDriver{
name: driverName, name: driverName,
version: version, version: version,
nodeID: nodeID, nodeID: nodeID,
endpoint: endpoint, endpoint: endpoint,
controllerPlugin: controllerPlugin,
} }
n.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER}) n.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER})
glog.Infof("controllerPlugin: %s", n.controllerPlugin)
glog.Infof("CreateVolume: %v, DeleteVolume: %v", isSupported(n.controllerPlugin, "CreateVolume"), isSupported(n.controllerPlugin, "DeleteVolume"))
createVolume, _ := lookupSymbol(n.controllerPlugin, "CreateVolume")
deleteVolume, _ := lookupSymbol(n.controllerPlugin, "DeleteVolume")
glog.Infof("CreateVolume: %v, DeleteVolume: %v", createVolume, deleteVolume)
if isSupported(n.controllerPlugin, "CreateVolume") && isSupported(n.controllerPlugin, "DeleteVolume") { csPlugin, csc, err := loadControllerPlugin(controllerPlugin)
n.AddControllerServiceCapabilities([]csi.ControllerServiceCapability_RPC_Type{csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME}) if err != nil {
} else { return nil, fmt.Errorf("Failed to load plugin %s: %v", controllerPlugin, err)
n.AddControllerServiceCapabilities([]csi.ControllerServiceCapability_RPC_Type{csi.ControllerServiceCapability_RPC_UNKNOWN})
} }
n.csPlugin = csPlugin
n.AddControllerServiceCapabilities(csc)
return n return n, nil
} }
func NewNodeServer(n *nfsDriver) *nodeServer { func NewNodeServer(n *nfsDriver) *nodeServer {
@ -80,8 +143,6 @@ func (n *nfsDriver) Run() {
s := NewNonBlockingGRPCServer() s := NewNonBlockingGRPCServer()
s.Start(n.endpoint, s.Start(n.endpoint,
NewDefaultIdentityServer(n), NewDefaultIdentityServer(n),
// NFS plugin has not implemented ControllerServer
// using default controllerserver.
NewControllerServer(n), NewControllerServer(n),
NewNodeServer(n)) NewNodeServer(n))
s.Wait() s.Wait()

58
pkg/nfs/types.go Normal file
View File

@ -0,0 +1,58 @@
package nfs
import (
"github.com/container-storage-interface/spec/lib/go/csi"
"golang.org/x/net/context"
)
// ControllerPlugin is an interface for controller that implements minimum set of controller methods
type ControllerPlugin interface {
// TODO: Consider ControllerGetCapabilities needs to be implemented depend on plugins
//ControllerGetCapabilities(ctx context.Context, cs *ControllerServer, req *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error)
ValidateVolumeCapabilities(ctx context.Context, cs *ControllerServer, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error)
}
// CreateDeleteVolumeControllerPlugin is an interface for controller that implements csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME
type CreateDeleteVolumeControllerPlugin interface {
ControllerPlugin
CreateVolume(ctx context.Context, cs *ControllerServer, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error)
DeleteVolume(ctx context.Context, cs *ControllerServer, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error)
}
// PublishUnpublishVolumeControllerPlugin is an interface for controller that implements csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME
type PublishUnpublishVolumeControllerPlugin interface {
ControllerPlugin
ControllerPublishVolume(ctx context.Context, cs *ControllerServer, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error)
ControllerUnpublishVolume(ctx context.Context, cs *ControllerServer, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error)
}
// ListVolumesControllerPlugin is an interface for controller that implements csi.ControllerServiceCapability_RPC_LIST_VOLUMES
type ListVolumesControllerPlugin interface {
ControllerPlugin
ListVolumes(ctx context.Context, cs *ControllerServer, req *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error)
}
// GetCapacityControllerPlugin is an interface for controller that implements csi.ControllerServiceCapability_RPC_GET_CAPACITY
type GetCapacityControllerPlugin interface {
ControllerPlugin
GetCapacity(ctx context.Context, cs *ControllerServer, req *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error)
}
// SnapshotControllerPlugin is an interface for controller that implements csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT
type SnapshotControllerPlugin interface {
ControllerPlugin
CreateSnapshot(ctx context.Context, cs *ControllerServer, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error)
DeleteSnapshot(ctx context.Context, cs *ControllerServer, req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error)
}
// ListSnapshotControllerPlugin is an interface for controller that implements csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS
type ListSnapshotControllerPlugin interface {
ControllerPlugin
ListSnapshots(ctx context.Context, cs *ControllerServer, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error)
}
// ExpandVolumeControllerPlugin is an interface for controller that implements csi.ControllerServiceCapability_RPC_EXPAND_VOLUME
type ExpandVolumeControllerPlugin interface {
ControllerPlugin
ControllerExpandVolume(ctx context.Context, cs *ControllerServer, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error)
}

View File

@ -24,7 +24,17 @@ const (
mountPathBase = "/csi-nfs-volume" mountPathBase = "/csi-nfs-volume"
) )
func CreateVolume(cs *nfs.ControllerServer, ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { // csPlugin is an implementation of ControllerPlugin
type csPlugin struct {
name string
}
var _ nfs.ControllerPlugin = csPlugin{}
var _ nfs.CreateDeleteVolumeControllerPlugin = csPlugin{}
var NfsPlugin = csPlugin{"NfsPlugin"}
// CreateVolume is an implenetaiton that is required by CreateDeleteVolumeControllerPlugin interface
func (p csPlugin) CreateVolume(ctx context.Context, cs *nfs.ControllerServer, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
glog.Infof("plugin.CreateVolume called") glog.Infof("plugin.CreateVolume called")
var volSize int64 var volSize int64
if req.GetCapacityRange() != nil { if req.GetCapacityRange() != nil {
@ -68,7 +78,8 @@ func CreateVolume(cs *nfs.ControllerServer, ctx context.Context, req *csi.Create
}, nil }, nil
} }
func DeleteVolume(cs *nfs.ControllerServer, ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) { // DeleteVolume is an implenetaiton that is required by CreateDeleteVolumeControllerPlugin interface
func (p csPlugin) DeleteVolume(ctx context.Context, cs *nfs.ControllerServer, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
glog.Infof("plugin.DeleteVolume called") glog.Infof("plugin.DeleteVolume called")
volumeID := req.GetVolumeId() volumeID := req.GetVolumeId()
if volumeID == "" { if volumeID == "" {
@ -106,7 +117,8 @@ func DeleteVolume(cs *nfs.ControllerServer, ctx context.Context, req *csi.Delete
return &csi.DeleteVolumeResponse{}, nil return &csi.DeleteVolumeResponse{}, nil
} }
func ValidateVolumeCapabilities(cs *nfs.ControllerServer, ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) { // ValidateVolumeCapabilities is an implenetaiton that is required by ControllerPlugin interface
func (p csPlugin) ValidateVolumeCapabilities(ctx context.Context, cs *nfs.ControllerServer, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
if req.GetVolumeId() == "" { if req.GetVolumeId() == "" {
return nil, status.Error(codes.InvalidArgument, "Empty volume ID in request") return nil, status.Error(codes.InvalidArgument, "Empty volume ID in request")
} }