Skip to content

Commit

Permalink
Consolidate bulk read for readonly and readwrite devices.
Browse files Browse the repository at this point in the history
  • Loading branch information
MatthewHink committed Sep 1, 2020
1 parent 7e195fe commit d9f3830
Show file tree
Hide file tree
Showing 8 changed files with 1,042 additions and 41 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#

PLUGIN_NAME := modbus-ip
PLUGIN_VERSION := 2.0.7
PLUGIN_VERSION := 2.0.8
IMAGE_NAME := vaporio/modbus-ip-plugin
BIN_NAME := synse-modbus-ip-plugin

Expand Down
27 changes: 21 additions & 6 deletions pkg/devices/coils.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,20 @@ var CoilsHandler = sdk.DeviceHandler{
// that only read from coils.
var ReadOnlyCoilsHandler = sdk.DeviceHandler{
Name: "read_only_coil",
BulkRead: bulkReadCoils,
BulkRead: bulkReadReadOnlyCoils,
}

// bulkReadCoils performs a bulk read on the devices parameter reducing round trips.
func bulkReadCoils(devices []*sdk.Device) (readContexts []*sdk.ReadContext, err error) {

log.Debugf("----------- bulkReadCoils start ---------------")

// Ideally this would be done in setup, but for now this should work.
// Map out the bulk read.
bulkReadMap, keyOrder, err := MapBulkRead(devices, true)
// Call SetupBulkRead in case it's not setup, then get the bulk read map for coils.
SetupBulkRead()
bulkReadMap, keyOrder, err := GetBulkReadMap("coil")
if err != nil {
return nil, err
return
}
log.Debugf("bulkReadMap: %#v", bulkReadMap)

// Perform the bulk reads.
for a := 0; a < len(keyOrder); a++ {
Expand Down Expand Up @@ -79,6 +78,22 @@ func bulkReadCoils(devices []*sdk.Device) (readContexts []*sdk.ReadContext, err
return
}

// bulkReadReadOnlyCoils is a noop unless only read only coils are defined and
// no read/write coils are defined.
func bulkReadReadOnlyCoils(devices []*sdk.Device) (readContexts []*sdk.ReadContext, err error) {
SetupBulkRead()
var shortedOut bool
shortedOut, err = GetCoilsShortedOut()
if err != nil {
return
}
if !shortedOut {
// We need to call bulk read here because no read/write coils are defined.
return bulkReadCoils(devices)
}
return
}

// writeCoils is the read function for the coils device handler.
func writeCoils(device *sdk.Device, data *sdk.WriteData) (err error) {

Expand Down
243 changes: 234 additions & 9 deletions pkg/devices/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ func GetBulkReadClient(k ModbusBulkReadKey) (client modbus.Client, modbusDeviceD
Port: k.Port,
Timeout: k.Timeout,
FailOnError: k.FailOnError,
// Omitting SlaveID for now. Not currently used.
// TODO: Above belongs here as well as in ModbusBulkReadKey.
SlaveID: k.SlaveID,
}
log.Debugf("modbusDeviceData: %#v", modbusDeviceData)
client, err = utils.NewClient(modbusDeviceData)
Expand Down Expand Up @@ -118,6 +117,8 @@ type ModbusBulkReadKey struct {
Timeout string
// Fail on error. (Do we abort on one failed read?)
FailOnError bool
// SlaveID is the modbus slave address which is not normally used in modbus over TCP.
SlaveID int
// Maximum number of registers to read on a single modbus call to the device.
MaximumRegisterCount uint16
}
Expand Down Expand Up @@ -339,6 +340,23 @@ func MapBulkRead(devices []*sdk.Device, isCoil bool) (
return bulkReadMap, keyOrder, nil
}

// DumpBulkReadMap dumps the map in key order to the log at Info.
func DumpBulkReadMap(bulkReadMap map[ModbusBulkReadKey][]*ModbusBulkRead, keyOrder []ModbusBulkReadKey) {

for a := 0; a < len(keyOrder); a++ {
k := keyOrder[a]
v := bulkReadMap[k]

// Dump the key.
log.Infof("%#v", k)

// Dump each read in v.
for i := 0; i < len(v); i++ {
log.Infof(" %d: StartRegister: %d, RegisterCount: %d", i, v[i].StartRegister, v[i].RegisterCount)
}
}
}

// MapBulkReadData maps the data read over modbus to the device read contexts.
func MapBulkReadData(bulkReadMap map[ModbusBulkReadKey][]*ModbusBulkRead, keyOrder []ModbusBulkReadKey) (
readContexts []*sdk.ReadContext, err error) {
Expand Down Expand Up @@ -444,31 +462,238 @@ func MapBulkReadData(bulkReadMap map[ModbusBulkReadKey][]*ModbusBulkRead, keyOrd
// ModbusCallCounter is for testing. We increment it once after each network round
// trip with any modbus server.
var modbusCallCounter uint64
var mutex sync.Mutex
var callCounterMutex sync.Mutex

// GetModbusCallCounter gets the number of modbus calls to any modbus server.
func GetModbusCallCounter() (counter uint64) {
mutex.Lock()
callCounterMutex.Lock()
counter = modbusCallCounter
mutex.Unlock()
callCounterMutex.Unlock()
return
}

// ResetModbusCallCounter resets the counter to zero for test purposes.
func ResetModbusCallCounter() {
mutex.Lock()
callCounterMutex.Lock()
modbusCallCounter = 0
mutex.Unlock()
callCounterMutex.Unlock()
}

// incrementModbusCallCounter is called internally whenever a modbus request is
// made to any modbus server.
func incrementModbusCallCounter() {
mutex.Lock()
callCounterMutex.Lock()
if modbusCallCounter == math.MaxUint64 {
modbusCallCounter = 0 // roll over
} else {
modbusCallCounter++
}
mutex.Unlock()
callCounterMutex.Unlock()
}

// bulkReadManager aggregates devices for bulk read.
type bulkReadManager struct {
devices []*sdk.Device // A slice of all devices.
coilDevices []*sdk.Device // A slice of all coil devices.
holdingDevices []*sdk.Device // A slice of all holding register devices.
inputDevices []*sdk.Device // A slice of all input register devices.
setupCompleted bool // true once all setup is completed and we can perform bulk reads.

coilBulkReadMap map[ModbusBulkReadKey][]*ModbusBulkRead // Mapped bulk reads for coils.
coilKeyOrder []ModbusBulkReadKey // Order of the keys to traverse the coilBulkReadMap.

holdingBulkReadMap map[ModbusBulkReadKey][]*ModbusBulkRead // Mapped bulk reads for holding registers.
holdingKeyOrder []ModbusBulkReadKey // Order of the keys to traverse the holdingBulkReadMap.

inputBulkReadMap map[ModbusBulkReadKey][]*ModbusBulkRead // Mapped bulk reads for input registers.
inputKeyOrder []ModbusBulkReadKey // Order of the keys to traverse the inputBulkReadMap.

// true to make the read only coils bulk read a noop.
// This will be false unless there are only read only coils and no read/write coils.
shortOutReadOnlyCoil bool

// true to make the read only holding registers bulk read a noop.
// This will be false unless there are only read only holding registers and no read/write holding registers.
shortOutReadOnlyHolding bool
}

// addModbusDevice adds a sdk.Device to a bulkReadManager.
func (brm *bulkReadManager) addModbusDevice(d *sdk.Device) (err error) {
// Error check.
if brm == nil {
return fmt.Errorf("brm is nil")
}
if d == nil {
return fmt.Errorf("d is nil")
}

// Append to the device slices.
brm.devices = append(brm.devices, d)

if d.Handler == "coil" || d.Handler == "read_only_coil" {
if d.Handler == "coil" {
brm.shortOutReadOnlyCoil = true
}
brm.coilDevices = append(brm.coilDevices, d)
return
}

if d.Handler == "holding_register" || d.Handler == "read_only_holding_register" {
if d.Handler == "holding_register" {
brm.shortOutReadOnlyHolding = true
}
brm.holdingDevices = append(brm.holdingDevices, d)
return
}

if d.Handler == "input_register" {
brm.inputDevices = append(brm.inputDevices, d)
return
}

return fmt.Errorf("Unknown device handler %s", d.Handler)
}

// bulkReadSetupMutex puts a critical section around bulkReadManager.setup() so
// that bulk read calls scheduled in parallel do not collide.
var bulkReadSetupMutex sync.Mutex

// setup sets up the manager for bulk read. If the manager is already setup, this is a noop.
func (brm *bulkReadManager) setup() (err error) {
// Error check.
if brm == nil {
return fmt.Errorf("brm is nil")
}

bulkReadSetupMutex.Lock()
if brm.setupCompleted {
bulkReadSetupMutex.Unlock()
return
}

log.Infof("Setting up bulk read")

// Map out the bulk reads for coils.
brm.coilBulkReadMap, brm.coilKeyOrder, err = MapBulkRead(brm.coilDevices, true)
if err != nil {
bulkReadSetupMutex.Unlock()
return
}
log.Info("coilBulkReadMap:")
DumpBulkReadMap(brm.coilBulkReadMap, brm.coilKeyOrder)

// Map out the bulk reads for holding registers.
brm.holdingBulkReadMap, brm.holdingKeyOrder, err = MapBulkRead(brm.holdingDevices, false)
if err != nil {
bulkReadSetupMutex.Unlock()
return
}
log.Info("holdingBulkReadMap:")
DumpBulkReadMap(brm.holdingBulkReadMap, brm.holdingKeyOrder)

// Map out the bulk reads for input registers.
brm.inputBulkReadMap, brm.inputKeyOrder, err = MapBulkRead(brm.inputDevices, false)
if err != nil {
bulkReadSetupMutex.Unlock()
return
}
log.Info("inputBulkReadMap:")
DumpBulkReadMap(brm.inputBulkReadMap, brm.inputKeyOrder)

brm.setupCompleted = true
log.Infof("Bulk read setup completed")

log.Infof("shortOutReadOnlyCoil: %v\n", brm.shortOutReadOnlyCoil)
log.Infof("shortOutReadOnlyHolding: %v\n", brm.shortOutReadOnlyHolding)
bulkReadSetupMutex.Unlock()
return
}

// GetBulkReadMap get the bulk read map and key order for the given mapId.
// Valid mapIds are coil, holding, input.
func (brm *bulkReadManager) GetBulkReadMap(mapID string) (
bulkReadMap map[ModbusBulkReadKey][]*ModbusBulkRead, keyOrder []ModbusBulkReadKey, err error) {

if brm == nil {
err = fmt.Errorf("brm is nil")
return
}

if mapID == "coil" {
return brm.coilBulkReadMap, brm.coilKeyOrder, nil
}

if mapID == "holding" {
return brm.holdingBulkReadMap, brm.holdingKeyOrder, nil
}

if mapID == "input" {
return brm.inputBulkReadMap, brm.inputKeyOrder, nil
}

err = fmt.Errorf("Unknown mapId %s", mapID)
return
}

// GetCoilsShortedOut returns true if BulkReadReadOnlyCoils should be a no-op.
func (brm *bulkReadManager) GetCoilsShortedOut() (shortedOut bool, err error) {
if brm == nil {
err = fmt.Errorf("brm is nil")
return
}
return brm.shortOutReadOnlyCoil, nil
}

// GetHoldingShortedOut returns true if BulkReadReadOnlyHoldingRegisters should be a no-op.
func (brm *bulkReadManager) GetHoldingShortedOut() (shortedOut bool, err error) {
if brm == nil {
err = fmt.Errorf("brm is nil")
return
}
return brm.shortOutReadOnlyHolding, nil
}

// brManager is a file level global that aggregates devices for bulk read.
var brManager bulkReadManager

// AddModbusDevice runs once during plugin initialization for each synse modbus device.
func AddModbusDevice(p *sdk.Plugin, d *sdk.Device) (err error) {
return brManager.addModbusDevice(d)
}

// SetupBulkRead sets up the bulk read manager for bulk reads.
// If setup is already done, this is a noop.
func SetupBulkRead() {
brManager.setup()
}

// GetBulkReadMap get the bulk read map and key order for the given mapId.
// Valid mapIds are coil, holding, input.
func GetBulkReadMap(mapID string) (
bulkReadMap map[ModbusBulkReadKey][]*ModbusBulkRead, keyOrder []ModbusBulkReadKey, err error) {
return brManager.GetBulkReadMap(mapID)
}

// GetCoilsShortedOut returns true if BulkReadReadOnlyCoils should be a no-op.
func GetCoilsShortedOut() (shortedOut bool, err error) {
return brManager.GetCoilsShortedOut()
}

// GetHoldingShortedOut returns true if BulkReadReadOnlyCoils should be a no-op.
func GetHoldingShortedOut() (shortedOut bool, err error) {
return brManager.GetHoldingShortedOut()
}

// PurgeBulkReadManager is a test only function to reset brManager.
func PurgeBulkReadManager() {
log.Warn("Purging bulk read manager")
brManager = bulkReadManager{}
}

// OnModbusDeviceLoad is a setup action which is called once per modbus device.
// This adds each synse modbus device to the bulkReadManager.
var OnModbusDeviceLoad = sdk.DeviceAction{
Name: "modbus-device-load",
Filter: map[string][]string{"type": {"*"}}, // All devices
Action: AddModbusDevice,
}
Loading

0 comments on commit d9f3830

Please sign in to comment.