From cc481ceade71ceda0d1b1bfc2b247cdfe3617050 Mon Sep 17 00:00:00 2001 From: Mathusan Selvarajah Date: Thu, 30 May 2019 16:42:36 -0400 Subject: [PATCH] remove csi-common dependencies --- cmd/nfsplugin/main.go | 2 +- pkg/nfs/controllerserver.go | 58 +++++++++++++++++--- pkg/nfs/driver.go | 78 --------------------------- pkg/nfs/indentityserver.go | 49 +++++++++++++++++ pkg/nfs/nfs.go | 102 ++++++++++++++++++++++++++++++++++++ pkg/nfs/nodeserver.go | 32 ++++++++++- pkg/nfs/server.go | 96 +++++++++++++++++++++++++++++++++ pkg/nfs/utils.go | 55 +++++++++++++++++++ 8 files changed, 384 insertions(+), 88 deletions(-) delete mode 100644 pkg/nfs/driver.go create mode 100644 pkg/nfs/indentityserver.go create mode 100644 pkg/nfs/nfs.go create mode 100644 pkg/nfs/server.go create mode 100644 pkg/nfs/utils.go diff --git a/cmd/nfsplugin/main.go b/cmd/nfsplugin/main.go index e3865479..f757df0b 100644 --- a/cmd/nfsplugin/main.go +++ b/cmd/nfsplugin/main.go @@ -65,6 +65,6 @@ func main() { } func handle() { - d := nfs.NewDriver(nodeID, endpoint) + d := nfs.NewNFSdriver(nodeID, endpoint) d.Run() } diff --git a/pkg/nfs/controllerserver.go b/pkg/nfs/controllerserver.go index 67267d74..0bbbb423 100644 --- a/pkg/nfs/controllerserver.go +++ b/pkg/nfs/controllerserver.go @@ -2,22 +2,66 @@ package nfs import ( "github.com/container-storage-interface/spec/lib/go/csi" - "github.com/kubernetes-csi/drivers/pkg/csi-common" + "github.com/golang/glog" "golang.org/x/net/context" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) type ControllerServer struct { - *csicommon.DefaultControllerServer + Driver *nfsDriver } -func (cs ControllerServer) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) { +func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { return nil, status.Error(codes.Unimplemented, "") } -func getControllerServer(csiDriver *csicommon.CSIDriver) ControllerServer { - return ControllerServer{ - csicommon.NewDefaultControllerServer(csiDriver), - } +func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) { + return nil, status.Error(codes.Unimplemented, "") +} + +func (cs *ControllerServer) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) { + return nil, status.Error(codes.Unimplemented, "") +} + +func (cs *ControllerServer) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) { + return nil, status.Error(codes.Unimplemented, "") +} + +func (cs *ControllerServer) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) { + return nil, status.Error(codes.Unimplemented, "") +} + +func (cs *ControllerServer) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) { + return nil, status.Error(codes.Unimplemented, "") +} + +func (cs *ControllerServer) GetCapacity(ctx context.Context, req *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) { + return nil, status.Error(codes.Unimplemented, "") +} + +// ControllerGetCapabilities implements the default GRPC callout. +// Default supports all capabilities +func (cs *ControllerServer) ControllerGetCapabilities(ctx context.Context, req *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) { + glog.V(5).Infof("Using default ControllerGetCapabilities") + + return &csi.ControllerGetCapabilitiesResponse{ + Capabilities: cs.Driver.cscap, + }, nil +} + +func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) { + return nil, status.Error(codes.Unimplemented, "") +} + +func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) { + return nil, status.Error(codes.Unimplemented, "") +} + +func (cs *ControllerServer) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) { + return nil, status.Error(codes.Unimplemented, "") +} + +func (cs *ControllerServer) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) { + return nil, status.Error(codes.Unimplemented, "") } diff --git a/pkg/nfs/driver.go b/pkg/nfs/driver.go deleted file mode 100644 index 0cb47e7a..00000000 --- a/pkg/nfs/driver.go +++ /dev/null @@ -1,78 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package nfs - -import ( - "github.com/container-storage-interface/spec/lib/go/csi" - "github.com/golang/glog" - - csicommon "github.com/kubernetes-csi/drivers/pkg/csi-common" -) - -type driver struct { - csiDriver *csicommon.CSIDriver - endpoint string - - //ids *identityServer - ns *nodeServer - cap []*csi.VolumeCapability_AccessMode - cscap []*csi.ControllerServiceCapability -} - -const ( - driverName = "csi-nfsplugin" -) - -var ( - version = "1.0.0-rc2" -) - -func NewDriver(nodeID, endpoint string) *driver { - glog.Infof("Driver: %v version: %v", driverName, version) - - d := &driver{} - - d.endpoint = endpoint - - csiDriver := csicommon.NewCSIDriver(driverName, version, nodeID) - csiDriver.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER}) - // NFS plugin does not support ControllerServiceCapability now. - // If support is added, it should set to appropriate - // ControllerServiceCapability RPC types. - csiDriver.AddControllerServiceCapabilities([]csi.ControllerServiceCapability_RPC_Type{csi.ControllerServiceCapability_RPC_UNKNOWN}) - - d.csiDriver = csiDriver - - return d -} - -func NewNodeServer(d *driver) *nodeServer { - return &nodeServer{ - DefaultNodeServer: csicommon.NewDefaultNodeServer(d.csiDriver), - } -} - -func (d *driver) Run() { - s := csicommon.NewNonBlockingGRPCServer() - s.Start(d.endpoint, - csicommon.NewDefaultIdentityServer(d.csiDriver), - // NFS plugin has not implemented ControllerServer - // using default controllerserver. - getControllerServer(d.csiDriver), - NewNodeServer(d)) - s.Wait() -} diff --git a/pkg/nfs/indentityserver.go b/pkg/nfs/indentityserver.go new file mode 100644 index 00000000..af631b1c --- /dev/null +++ b/pkg/nfs/indentityserver.go @@ -0,0 +1,49 @@ +package nfs + +import ( + "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/golang/glog" + "golang.org/x/net/context" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type IdentityServer struct { + Driver *nfsDriver +} + +func (ids *IdentityServer) GetPluginInfo(ctx context.Context, req *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) { + glog.V(5).Infof("Using default GetPluginInfo") + + if ids.Driver.name == "" { + return nil, status.Error(codes.Unavailable, "Driver name not configured") + } + + if ids.Driver.version == "" { + return nil, status.Error(codes.Unavailable, "Driver is missing version") + } + + return &csi.GetPluginInfoResponse{ + Name: ids.Driver.name, + VendorVersion: ids.Driver.version, + }, nil +} + +func (ids *IdentityServer) Probe(ctx context.Context, req *csi.ProbeRequest) (*csi.ProbeResponse, error) { + return &csi.ProbeResponse{}, nil +} + +func (ids *IdentityServer) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) { + glog.V(5).Infof("Using default capabilities") + return &csi.GetPluginCapabilitiesResponse{ + Capabilities: []*csi.PluginCapability{ + { + Type: &csi.PluginCapability_Service_{ + Service: &csi.PluginCapability_Service{ + Type: csi.PluginCapability_Service_CONTROLLER_SERVICE, + }, + }, + }, + }, + }, nil +} diff --git a/pkg/nfs/nfs.go b/pkg/nfs/nfs.go new file mode 100644 index 00000000..c5378337 --- /dev/null +++ b/pkg/nfs/nfs.go @@ -0,0 +1,102 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nfs + +import ( + "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/golang/glog" +) + +type nfsDriver struct { + name string + nodeID string + version string + + endpoint string + + //ids *identityServer + ns *nodeServer + cap []*csi.VolumeCapability_AccessMode + cscap []*csi.ControllerServiceCapability +} + +const ( + driverName = "csi-nfsplugin" +) + +var ( + version = "1.0.0-rc2" +) + +func NewNFSdriver(nodeID, endpoint string) *nfsDriver { + glog.Infof("Driver: %v version: %v", driverName, version) + + n := &nfsDriver{ + name: driverName, + version: version, + nodeID: nodeID, + endpoint: endpoint, + } + + n.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER}) + // NFS plugin does not support ControllerServiceCapability now. + // If support is added, it should set to appropriate + // ControllerServiceCapability RPC types. + n.AddControllerServiceCapabilities([]csi.ControllerServiceCapability_RPC_Type{csi.ControllerServiceCapability_RPC_UNKNOWN}) + + return n +} + +func NewNodeServer(n *nfsDriver) *nodeServer { + return &nodeServer{ + Driver: n, + } +} + +func (n *nfsDriver) Run() { + s := NewNonBlockingGRPCServer() + s.Start(n.endpoint, + NewDefaultIdentityServer(n), + // NFS plugin has not implemented ControllerServer + // using default controllerserver. + NewControllerServer(n), + NewNodeServer(n)) + s.Wait() +} + +func (n *nfsDriver) AddVolumeCapabilityAccessModes(vc []csi.VolumeCapability_AccessMode_Mode) []*csi.VolumeCapability_AccessMode { + var vca []*csi.VolumeCapability_AccessMode + for _, c := range vc { + glog.Infof("Enabling volume access mode: %v", c.String()) + vca = append(vca, &csi.VolumeCapability_AccessMode{Mode: c}) + } + n.cap = vca + return vca +} + +func (n *nfsDriver) AddControllerServiceCapabilities(cl []csi.ControllerServiceCapability_RPC_Type) { + var csc []*csi.ControllerServiceCapability + + for _, c := range cl { + glog.Infof("Enabling controller service capability: %v", c.String()) + csc = append(csc, NewControllerServiceCapability(c)) + } + + n.cscap = csc + + return +} diff --git a/pkg/nfs/nodeserver.go b/pkg/nfs/nodeserver.go index a58eff26..750cd4a1 100644 --- a/pkg/nfs/nodeserver.go +++ b/pkg/nfs/nodeserver.go @@ -18,11 +18,11 @@ package nfs import ( "fmt" + "github.com/golang/glog" "os" "strings" "github.com/container-storage-interface/spec/lib/go/csi" - "github.com/kubernetes-csi/drivers/pkg/csi-common" "golang.org/x/net/context" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -30,7 +30,7 @@ import ( ) type nodeServer struct { - *csicommon.DefaultNodeServer + Driver *nfsDriver } func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) { @@ -98,6 +98,34 @@ func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu return &csi.NodeUnpublishVolumeResponse{}, nil } +func (ns *nodeServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) { + glog.V(5).Infof("Using default NodeGetInfo") + + return &csi.NodeGetInfoResponse{ + NodeId: ns.Driver.nodeID, + }, nil +} + +func (ns *nodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) { + glog.V(5).Infof("Using default NodeGetCapabilities") + + return &csi.NodeGetCapabilitiesResponse{ + Capabilities: []*csi.NodeServiceCapability{ + { + Type: &csi.NodeServiceCapability_Rpc{ + Rpc: &csi.NodeServiceCapability_RPC{ + Type: csi.NodeServiceCapability_RPC_UNKNOWN, + }, + }, + }, + }, + }, nil +} + +func (ns *nodeServer) NodeGetVolumeStats(ctx context.Context, in *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) { + return nil, status.Error(codes.Unimplemented, "") +} + func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) { return &csi.NodeUnstageVolumeResponse{}, nil } diff --git a/pkg/nfs/server.go b/pkg/nfs/server.go new file mode 100644 index 00000000..7b2ea275 --- /dev/null +++ b/pkg/nfs/server.go @@ -0,0 +1,96 @@ +package nfs + +import ( + "net" + "os" + "sync" + + "github.com/golang/glog" + "google.golang.org/grpc" + + "github.com/container-storage-interface/spec/lib/go/csi" +) + +// Defines Non blocking GRPC server interfaces +type NonBlockingGRPCServer interface { + // Start services at the endpoint + Start(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) + // Waits for the service to stop + Wait() + // Stops the service gracefully + Stop() + // Stops the service forcefully + ForceStop() +} + +func NewNonBlockingGRPCServer() NonBlockingGRPCServer { + return &nonBlockingGRPCServer{} +} + +// NonBlocking server +type nonBlockingGRPCServer struct { + wg sync.WaitGroup + server *grpc.Server +} + +func (s *nonBlockingGRPCServer) Start(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) { + + s.wg.Add(1) + + go s.serve(endpoint, ids, cs, ns) + + return +} + +func (s *nonBlockingGRPCServer) Wait() { + s.wg.Wait() +} + +func (s *nonBlockingGRPCServer) Stop() { + s.server.GracefulStop() +} + +func (s *nonBlockingGRPCServer) ForceStop() { + s.server.Stop() +} + +func (s *nonBlockingGRPCServer) serve(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) { + + proto, addr, err := ParseEndpoint(endpoint) + if err != nil { + glog.Fatal(err.Error()) + } + + if proto == "unix" { + addr = "/" + addr + if err := os.Remove(addr); err != nil && !os.IsNotExist(err) { + glog.Fatalf("Failed to remove %s, error: %s", addr, err.Error()) + } + } + + listener, err := net.Listen(proto, addr) + if err != nil { + glog.Fatalf("Failed to listen: %v", err) + } + + opts := []grpc.ServerOption{ + grpc.UnaryInterceptor(logGRPC), + } + server := grpc.NewServer(opts...) + s.server = server + + if ids != nil { + csi.RegisterIdentityServer(server, ids) + } + if cs != nil { + csi.RegisterControllerServer(server, cs) + } + if ns != nil { + csi.RegisterNodeServer(server, ns) + } + + glog.Infof("Listening for connections on address: %#v", listener.Addr()) + + server.Serve(listener) + +} diff --git a/pkg/nfs/utils.go b/pkg/nfs/utils.go new file mode 100644 index 00000000..63e13f8a --- /dev/null +++ b/pkg/nfs/utils.go @@ -0,0 +1,55 @@ +package nfs + +import ( + "fmt" + "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/golang/glog" + "github.com/kubernetes-csi/csi-lib-utils/protosanitizer" + "golang.org/x/net/context" + "google.golang.org/grpc" + "strings" +) + +func NewDefaultIdentityServer(d *nfsDriver) *IdentityServer { + return &IdentityServer{ + Driver: d, + } +} + +func NewControllerServer(d *nfsDriver) *ControllerServer { + return &ControllerServer{ + Driver: d, + } +} + +func NewControllerServiceCapability(cap csi.ControllerServiceCapability_RPC_Type) *csi.ControllerServiceCapability { + return &csi.ControllerServiceCapability{ + Type: &csi.ControllerServiceCapability_Rpc{ + Rpc: &csi.ControllerServiceCapability_RPC{ + Type: cap, + }, + }, + } +} + +func ParseEndpoint(ep string) (string, string, error) { + if strings.HasPrefix(strings.ToLower(ep), "unix://") || strings.HasPrefix(strings.ToLower(ep), "tcp://") { + s := strings.SplitN(ep, "://", 2) + if s[1] != "" { + return s[0], s[1], nil + } + } + return "", "", fmt.Errorf("Invalid endpoint: %v", ep) +} + +func logGRPC(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + glog.V(3).Infof("GRPC call: %s", info.FullMethod) + glog.V(5).Infof("GRPC request: %s", protosanitizer.StripSecrets(req)) + resp, err := handler(ctx, req) + if err != nil { + glog.Errorf("GRPC error: %v", err) + } else { + glog.V(5).Infof("GRPC response: %s", protosanitizer.StripSecrets(resp)) + } + return resp, err +}