forked from pytorch/pytorch
-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_accelerator.py
73 lines (60 loc) · 2.79 KB
/
test_accelerator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# Owner(s): ["module: tests"]
import sys
import unittest
import torch
from torch.testing._internal.common_utils import NoTest, run_tests, TestCase
if not torch.accelerator.is_available():
print("No available accelerator detected, skipping tests", file=sys.stderr)
TestCase = NoTest # noqa: F811
TEST_MULTIACCELERATOR = torch.accelerator.device_count() > 1
class TestAccelerator(TestCase):
def test_current_accelerator(self):
self.assertTrue(torch.accelerator.is_available())
accelerators = ["cuda", "xpu", "mps"]
for accelerator in accelerators:
if torch.get_device_module(accelerator).is_available():
self.assertEqual(
torch.accelerator.current_accelerator().type, accelerator
)
self.assertIsNone(torch.accelerator.current_accelerator().index)
with self.assertRaisesRegex(
ValueError, "doesn't match the current accelerator"
):
torch.accelerator.set_device_idx("cpu")
@unittest.skipIf(not TEST_MULTIACCELERATOR, "only one accelerator detected")
def test_generic_multi_device_behavior(self):
orig_device = torch.accelerator.current_device_idx()
target_device = (orig_device + 1) % torch.accelerator.device_count()
torch.accelerator.set_device_idx(target_device)
self.assertEqual(target_device, torch.accelerator.current_device_idx())
torch.accelerator.set_device_idx(orig_device)
self.assertEqual(orig_device, torch.accelerator.current_device_idx())
s1 = torch.Stream(target_device)
torch.accelerator.set_stream(s1)
self.assertEqual(target_device, torch.accelerator.current_device_idx())
torch.accelerator.synchronize(orig_device)
self.assertEqual(target_device, torch.accelerator.current_device_idx())
def test_generic_stream_behavior(self):
s1 = torch.Stream()
s2 = torch.Stream()
torch.accelerator.set_stream(s1)
self.assertEqual(torch.accelerator.current_stream(), s1)
event = torch.Event()
a = torch.randn(100)
b = torch.randn(100)
c = a + b
torch.accelerator.set_stream(s2)
self.assertEqual(torch.accelerator.current_stream(), s2)
a_acc = a.to(torch.accelerator.current_accelerator(), non_blocking=True)
b_acc = b.to(torch.accelerator.current_accelerator(), non_blocking=True)
torch.accelerator.set_stream(s1)
self.assertEqual(torch.accelerator.current_stream(), s1)
event.record(s2)
event.synchronize()
c_acc = a_acc + b_acc
event.record(s2)
torch.accelerator.synchronize()
self.assertTrue(event.query())
self.assertEqual(c_acc.cpu(), c)
if __name__ == "__main__":
run_tests()