Skip to content

Commit

Permalink
[SDFAB-189] Add UPF programmable behaviour (#276)
Browse files Browse the repository at this point in the history
* Initial move of upf programmable

* Cleanup tests for upf programmable

* Fix checkstyle

* Fix pom file

* Add UPF behaviour when pipeconf requires it

Also, use do not manage UE limit on physical pipeline.

* Force to use local artifacs

* Fix read all counters usage and new tests

* Update full profile suffix.

* Make mocks more generic and independent from upf constants

* Add dependency to fabric v1model and remove redundant interfaces

* Add missing license headers

* Remove dependency to fabric-v1model

* Use different name for distributed structures

* Rename upf store interface name

* Build using snapshots

* Fix MockReadRequest

* Fix failure to find onos-build-conf

Co-authored-by: Carmelo Cascone <[email protected]>
  • Loading branch information
daniele-moro and ccascone authored Jun 18, 2021
1 parent bcbb0e7 commit 5dc6523
Show file tree
Hide file tree
Showing 27 changed files with 3,538 additions and 9 deletions.
27 changes: 19 additions & 8 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ SPDX-License-Identifier: LicenseRef-ONF-Member-Only-1.0
<parent>
<groupId>org.onosproject</groupId>
<artifactId>onos-dependencies</artifactId>
<version>2.5.2-b1</version>
<version>2.5.2-SNAPSHOT</version>
</parent>

<groupId>org.stratumproject</groupId>
Expand Down Expand Up @@ -40,22 +40,22 @@ SPDX-License-Identifier: LicenseRef-ONF-Member-Only-1.0
</dependency>

<dependency>
<groupId>${trellis.api.groupId}</groupId>
<artifactId>${trellis.api.artifactId}</artifactId>
<version>${trellis.api.version}</version>
<groupId>org.onosproject</groupId>
<artifactId>onos-drivers-p4runtime</artifactId>
<version>${onos.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onlab-osgi</artifactId>
<version>${onos.version}</version>
<groupId>${trellis.api.groupId}</groupId>
<artifactId>${trellis.api.artifactId}</artifactId>
<version>${trellis.api.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-pipelines-fabric-api</artifactId>
<artifactId>onlab-osgi</artifactId>
<version>${onos.version}</version>
<scope>provided</scope>
</dependency>
Expand Down Expand Up @@ -181,6 +181,17 @@ SPDX-License-Identifier: LicenseRef-ONF-Member-Only-1.0
</snapshots>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>snapshots</id>
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
<snapshots>
<enabled>true</enabled>
<updatePolicy>always</updatePolicy>
<checksumPolicy>fail</checksumPolicy>
</snapshots>
</pluginRepository>
</pluginRepositories>

<distributionManagement>
<snapshotRepository>
Expand Down
12 changes: 11 additions & 1 deletion src/main/java/org/stratumproject/fabric/tna/PipeconfLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import org.onosproject.core.CoreService;
import org.onosproject.net.behaviour.Pipeliner;
import org.onosproject.net.behaviour.upf.UpfProgrammable;
import org.onosproject.net.behaviour.inbandtelemetry.IntProgrammable;
import org.onosproject.net.pi.model.DefaultPiPipeconf;
import org.onosproject.net.pi.model.PiPipeconf;
Expand All @@ -26,6 +27,7 @@
import org.stratumproject.fabric.tna.behaviour.FabricIntProgrammable;
import org.stratumproject.fabric.tna.behaviour.FabricInterpreter;
import org.stratumproject.fabric.tna.behaviour.pipeliner.FabricPipeliner;
import org.stratumproject.fabric.tna.behaviour.upf.FabricUpfProgrammable;

import java.io.File;
import java.io.FileNotFoundException;
Expand Down Expand Up @@ -68,7 +70,8 @@ public class PipeconfLoader {
private static final String PIPELINE_CONFIG = "pipeline_config.pb.bin";

private static final String INT_PROFILE_SUFFIX = "-int";
private static final String FULL_PROFILE_SUFFIX = "-full";
private static final String UPF_PROFILE_SUFFIX = "-spgw";
private static final String FULL_PROFILE_SUFFIX = "-spgw-int";

@Activate
public void activate() {
Expand Down Expand Up @@ -132,6 +135,13 @@ private PiPipeconf buildPipeconfFromPath(String path) {
builder.addBehaviour(IntProgrammable.class, FabricIntProgrammable.class);
}

// Add UpfProgrammable behaviour for UPF-enabled profiles.
if (profile.endsWith(UPF_PROFILE_SUFFIX) ||
profile.endsWith(FULL_PROFILE_SUFFIX)) {
builder.addBehaviour(UpfProgrammable.class, FabricUpfProgrammable.class);
}


return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ public final class Constants {
public static final int DEFAULT_PW_TRANSPORT_VLAN = 4090;
public static final int PKT_IN_MIRROR_SESSION_ID = 0x210;

// UPF related constants
public static final int UPF_INTERFACE_ACCESS = 1;
public static final int UPF_INTERFACE_CORE = 2;
public static final int UPF_INTERFACE_DBUF = 3;

// hide default constructor
private Constants() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onosproject.net.pi.model.PiPipeconf.ExtensionType.CPU_PORT_TXT;
import static org.slf4j.LoggerFactory.getLogger;
import static org.stratumproject.fabric.tna.behaviour.P4InfoConstants.FABRIC_INGRESS_SPGW_DOWNLINK_PDRS;

/**
* Representation of the capabilities of a given fabric-tna pipeconf.
Expand Down Expand Up @@ -83,6 +84,17 @@ public Optional<Integer> cpuPort() {
}
}

/**
* Returns true if the pipeconf supports UPF capabilities, false otherwise.
*
* @return boolean
*/
public boolean supportUpf() {
return pipeconf.pipelineModel()
.table(FABRIC_INGRESS_SPGW_DOWNLINK_PDRS)
.isPresent();
}

public boolean supportDoubleVlanTerm() {
// TODO: re-enable support for double-vlan
// FIXME: next_vlan has been moved to pre_next
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
// Copyright 2020-present Open Networking Foundation
// SPDX-License-Identifier: LicenseRef-ONF-Member-Only-1.0
package org.stratumproject.fabric.tna.behaviour.upf;

import com.google.common.collect.BiMap;
import com.google.common.collect.ImmutableBiMap;
import com.google.common.collect.Maps;
import org.onosproject.net.behaviour.upf.PacketDetectionRule;
import org.onlab.packet.Ip4Address;
import org.onlab.util.ImmutableByteSequence;
import org.onlab.util.KryoNamespace;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.DistributedSet;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

import static com.google.common.base.Preconditions.checkNotNull;

/**
* Distributed implementation of FabricUpfStore.
*/
// FIXME: this store is generic and not tied to a single device, should we have a store based on deviceId?
@Component(immediate = true, service = DistributedFabricUpfStore.class)
public final class DistributedFabricUpfStore implements FabricUpfStore {

private final Logger log = LoggerFactory.getLogger(getClass());

@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected StorageService storageService;

protected static final String FAR_ID_MAP_NAME = "fabric-upf-far-id-tna";
protected static final String BUFFER_FAR_ID_SET_NAME = "fabric-upf-buffer-far-id-tna";
protected static final String FAR_ID_UE_MAP_NAME = "fabric-upf-far-id-ue-tna";
protected static final KryoNamespace.Builder SERIALIZER = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
.register(UpfRuleIdentifier.class);

// Mapping between scheduling priority ranges with Tofino priority queues
// i.e., default queues are 8 in Tofino
private static final BiMap<Integer, Integer> SCHEDULING_PRIORITY_MAP
= new ImmutableBiMap.Builder<Integer, Integer>()
// Highest scheduling priority for 3GPP is 1 and highest Tofino queue priority is 7
.put(1, 5)
.put(6, 4)
.put(7, 3)
.put(8, 2)
.put(9, 1)
.build();

// Distributed local FAR ID to global FAR ID mapping
protected ConsistentMap<UpfRuleIdentifier, Integer> farIdMap;
private MapEventListener<UpfRuleIdentifier, Integer> farIdMapListener;
// Local, reversed copy of farIdMapper for better reverse lookup performance
protected Map<Integer, UpfRuleIdentifier> reverseFarIdMap;
private int nextGlobalFarId = 1;

protected DistributedSet<UpfRuleIdentifier> bufferFarIds;
protected ConsistentMap<UpfRuleIdentifier, Set<Ip4Address>> farIdToUeAddrs;

@Activate
protected void activate() {
// Allow unit test to inject farIdMap here.
if (storageService != null) {
this.farIdMap = storageService.<UpfRuleIdentifier, Integer>consistentMapBuilder()
.withName(FAR_ID_MAP_NAME)
.withRelaxedReadConsistency()
.withSerializer(Serializer.using(SERIALIZER.build()))
.build();
this.bufferFarIds = storageService.<UpfRuleIdentifier>setBuilder()
.withName(BUFFER_FAR_ID_SET_NAME)
.withRelaxedReadConsistency()
.withSerializer(Serializer.using(SERIALIZER.build()))
.build().asDistributedSet();
this.farIdToUeAddrs = storageService.<UpfRuleIdentifier, Set<Ip4Address>>consistentMapBuilder()
.withName(FAR_ID_UE_MAP_NAME)
.withRelaxedReadConsistency()
.withSerializer(Serializer.using(SERIALIZER.build()))
.build();

}
farIdMapListener = new FarIdMapListener();
farIdMap.addListener(farIdMapListener);

reverseFarIdMap = Maps.newHashMap();
farIdMap.entrySet().forEach(entry -> reverseFarIdMap.put(entry.getValue().value(), entry.getKey()));

log.info("Started");
}

@Deactivate
protected void deactivate() {
farIdMap.removeListener(farIdMapListener);
farIdMap.destroy();
reverseFarIdMap.clear();

log.info("Stopped");
}

@Override
public void reset() {
farIdMap.clear();
reverseFarIdMap.clear();
bufferFarIds.clear();
farIdToUeAddrs.clear();
nextGlobalFarId = 0;
}

@Override
public Map<UpfRuleIdentifier, Integer> getFarIdMap() {
return Map.copyOf(farIdMap.asJavaMap());
}

@Override
public int globalFarIdOf(UpfRuleIdentifier farIdPair) {
int globalFarId = farIdMap.compute(farIdPair,
(k, existingId) -> {
return Objects.requireNonNullElseGet(existingId, () -> nextGlobalFarId++);
}).value();
log.info("{} translated to GlobalFarId={}", farIdPair, globalFarId);
return globalFarId;
}

@Override
public int globalFarIdOf(ImmutableByteSequence pfcpSessionId, int sessionLocalFarId) {
UpfRuleIdentifier farId = new UpfRuleIdentifier(pfcpSessionId, sessionLocalFarId);
return globalFarIdOf(farId);

}

@Override
public String queueIdOf(int schedulingPriority) {
return (SCHEDULING_PRIORITY_MAP.get(schedulingPriority)).toString();
}

@Override
public String schedulingPriorityOf(int queueId) {
return (SCHEDULING_PRIORITY_MAP.inverse().get(queueId)).toString();
}

@Override
public UpfRuleIdentifier localFarIdOf(int globalFarId) {
return reverseFarIdMap.get(globalFarId);
}

public void learnFarIdToUeAddrs(PacketDetectionRule pdr) {
UpfRuleIdentifier ruleId = UpfRuleIdentifier.of(pdr.sessionId(), pdr.farId());
farIdToUeAddrs.compute(ruleId, (k, set) -> {
if (set == null) {
set = new HashSet<>();
}
set.add(pdr.ueAddress());
return set;
});
}

@Override
public boolean isFarIdBuffering(UpfRuleIdentifier farId) {
checkNotNull(farId);
return bufferFarIds.contains(farId);
}

@Override
public void learBufferingFarId(UpfRuleIdentifier farId) {
checkNotNull(farId);
bufferFarIds.add(farId);
}

@Override
public void forgetBufferingFarId(UpfRuleIdentifier farId) {
checkNotNull(farId);
bufferFarIds.remove(farId);
}

@Override
public void forgetUeAddr(Ip4Address ueAddr) {
farIdToUeAddrs.keySet().forEach(
farId -> farIdToUeAddrs.computeIfPresent(farId, (farIdz, ueAddrs) -> {
ueAddrs.remove(ueAddr);
return ueAddrs;
}));
}

@Override
public Set<Ip4Address> ueAddrsOfFarId(UpfRuleIdentifier farId) {
return farIdToUeAddrs.getOrDefault(farId, Set.of()).value();
}

@Override
public Set<UpfRuleIdentifier> getBufferFarIds() {
return Set.copyOf(bufferFarIds);
}

@Override
public Map<UpfRuleIdentifier, Set<Ip4Address>> getFarIdToUeAddrs() {
return Map.copyOf(farIdToUeAddrs.asJavaMap());
}

// NOTE: FarIdMapListener is run on the same thread intentionally in order to ensure that
// reverseFarIdMap update always finishes right after farIdMap is updated
private class FarIdMapListener implements MapEventListener<UpfRuleIdentifier, Integer> {
@Override
public void event(MapEvent<UpfRuleIdentifier, Integer> event) {
switch (event.type()) {
case INSERT:
reverseFarIdMap.put(event.newValue().value(), event.key());
break;
case UPDATE:
reverseFarIdMap.remove(event.oldValue().value());
reverseFarIdMap.put(event.newValue().value(), event.key());
break;
case REMOVE:
reverseFarIdMap.remove(event.oldValue().value());
break;
default:
break;
}
}
}
}
Loading

0 comments on commit 5dc6523

Please sign in to comment.