forked from hhauer/ldap-bulk-change
-
Notifications
You must be signed in to change notification settings - Fork 0
/
ldap-bulk-change.py
executable file
·181 lines (144 loc) · 6.01 KB
/
ldap-bulk-change.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
#!/usr/bin/env python3
import os
import sys
import argparse
import configparser
import re
import logging
# http://pythonhosted.org/python3-ldap/
from ldap3 import Connection, Server, MODIFY_REPLACE
__author__ = 'hhauer'
logger = logging.getLogger(__name__)
# Start by setting up argparse
parser = argparse.ArgumentParser(description='Make a bulk change to users in LDAP.')
parser.add_argument('--verbose', '-v', action='count', default=0,
help="Set the verbosity level.")
parser.add_argument('--nossl', action="store_true",
help="Connect without SSL.")
parser.add_argument('--environment', '-e',
help="Use one of the environments defined in ~/.ldap_envs instead.")
parser.add_argument('--dry-run', '-n', action="store_true",
help="Do not make changes, use with -v")
parser.add_argument('--log', '-l',
help="Log to file instead of stdout, overwrites file")
# TODO: Add dry-run option to show what changes would be made.
# TODO: Make verbose do something.
# Command line environment options.
parser.add_argument('--host', help="The LDAP host URL.")
parser.add_argument('--port', help="The LDAP port.", default="636")
parser.add_argument('--bind-dn', help="The DN to bind as.")
parser.add_argument('--password', help="The password for the bind DN.")
parser.add_argument('--base-dn', help="The base DN from which to search.")
# The action we actually want to take.
parser.add_argument('--filter', help="An LDAP filter to limit the DNs operated on.",
default="(objectClass=*)")
parser.add_argument('change_attr', help="The attribute to be changed.")
parser.add_argument('regexp', help="A regexp used to determine the new value of change_attr.")
parser.add_argument('replace', help="The value to substitute into the new value of change_attr.")
CONFIG_KEYS = ['host', 'port', 'bind_dn', 'password', 'base_dn']
def main():
args = parser.parse_args()
setup_logging(args)
if args.dry_run:
logger.info("Dry run mode, no changes will be made")
target = load_config(args)
connection = connect(args, target)
search_results = search(args, target, connection)
change_set = apply_regex(args, search_results)
commit(args, connection, change_set)
disconnect(connection)
def setup_logging(args):
levels = {
0: logging.CRITICAL,
1: logging.INFO,
2: logging.DEBUG
}
logger.setLevel(levels.get(args.verbose, logging.DEBUG))
if args.log:
logger.addHandler(logging.FileHandler(args.log))
else:
logger.addHandler(logging.StreamHandler(sys.stdout))
def load_config(args):
# Load any environment configurations.
config = configparser.ConfigParser()
config.read(['.ldap_envs', os.path.expanduser('~/.ldap_envs')])
# Build a record of the target environment.
if args.environment is not None:
try:
logger.debug("Reading from environment %s", args.environment)
target = dict(config[args.environment])
except KeyError:
logger.critical("environment %s does not exist", args.environment)
sys.exit(1)
else:
# Default all values to None
logger.info("No environment given, reading from CLI flags")
target = dict.fromkeys(CONFIG_KEYS)
# Overwrite default/environment config values with contents of args
for config_key in CONFIG_KEYS:
if getattr(args, config_key) is not None:
target[config_key] = getattr(args, config_key)
# Make sure we have all the necessary parameters one way or another.
soft_fail = False
for key, value in target.items():
if value is None:
logger.critical("No value for parameter: %s", key)
soft_fail = True
if soft_fail:
sys.exit(1)
return target
def connect(args, target):
# Open a connection to the LDAP server.
logger.debug("Connecting to %s:%s, SSL=%r", target['host'], target['port'],
not args.nossl)
if args.nossl:
server = Server(target['host'], port=int(target['port']))
else:
server = Server(target['host'], port=int(target['port']), use_ssl=True)
logger.debug("Authenticating with user=%s, password=<omitted>",
target['bind_dn'])
return Connection(server, user=target['bind_dn'],
password=target['password'], auto_bind=True)
def search(args, target, connection):
# Find our set of target DNs.
connection.search(target['base_dn'], args.filter,
attributes=[args.change_attr])
results = {}
for record in connection.response:
results[record['dn']] = record['attributes'][args.change_attr]
logger.info("Retrieved %d records", len(results))
return results
def apply_regex(args, search_results):
regexp = re.compile(args.regexp)
change_set = {}
for dn, attributes in search_results.items():
new_values = []
should_change = False
for attr in attributes:
new_attr = regexp.sub(args.replace, attr)
if new_attr != attr:
should_change = True
logger.info("Would modify %s: %r -> %r", dn, attr, new_attr)
else:
logger.info("Not modifying %s value: %r", dn, attr)
new_values.append(new_attr)
if should_change:
change_set[dn] = {
args.change_attr: (MODIFY_REPLACE, new_values),
}
else:
logger.info("Skipping %s: No change necessary", dn)
return change_set
def commit(args, connection, change_set):
if not args.dry_run:
# Set the new values in LDAP.
for dn, attributes in change_set.items():
logger.info("Modifying %s with %r", dn, attributes)
connection.modify(dn, attributes)
logger.info("Modified: %s: %s", dn,
connection.result['description'])
def disconnect(connection):
logger.debug("Disconnecting from server")
connection.unbind()
if __name__ == "__main__":
main()